This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b874180 IGNITE-14408 Implement the Vault Service on top of RocksDB
(#174)
b874180 is described below
commit b87418052da15772bdf905cb7a07af7d05f79eb6
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Jun 28 15:45:41 2021 +0300
IGNITE-14408 Implement the Vault Service on top of RocksDB (#174)
---
.../internal/affinity/AffinityManagerTest.java | 3 +-
.../org/apache/ignite/app/IgnitionManager.java | 1 +
modules/core/pom.xml | 6 +
.../apache/ignite/internal/util/IgniteUtils.java | 101 +++++--
.../java/org/apache/ignite/lang/ByteArray.java | 40 +--
.../ignite/internal/util/IgniteUtilsTest.java | 58 +++++
.../internal/metastorage/MetaStorageManager.java | 152 +++++------
.../runner/app/DynamicTableCreationTest.java | 18 +-
.../ignite/internal/runner/app/IgnitionTest.java | 24 +-
.../internal/runner/app/TableCreationTest.java | 16 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 20 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 47 +++-
.../storage/DistributedConfigurationStorage.java | 5 +-
.../storage/LocalConfigurationStorage.java | 6 +-
.../ignite/internal/app/IgnitionCleaner.java} | 26 +-
modules/vault/pom.xml | 11 +
.../PersistencePropertiesVaultServiceTest.java | 122 +++++++++
.../persistence/PersistentVaultServiceTest.java | 69 +++++
.../vault/{common/Entry.java => VaultEntry.java} | 42 ++-
.../apache/ignite/internal/vault/VaultManager.java | 109 +++-----
.../internal/vault/{service => }/VaultService.java | 15 +-
.../internal/vault/impl/VaultServiceImpl.java | 103 --------
.../vault/inmemory/InMemoryVaultService.java | 119 +++++++++
.../vault/persistence/PersistentVaultService.java | 207 +++++++++++++++
.../vault/persistence/RocksIteratorAdapter.java | 100 +++++++
.../internal/vault/CompletableFutureMatcher.java | 69 +++++
.../ignite/internal/vault/VaultManagerTest.java | 73 ++++++
.../ignite/internal/vault/VaultServiceTest.java | 290 +++++++++++++++++++++
.../vault/impl/VaultBaseContractsTest.java | 221 ----------------
.../vault/inmemory/InMemoryVaultServiceTest.java} | 29 +--
30 files changed, 1477 insertions(+), 625 deletions(-)
diff --git
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
index 49576a0..ed480fd 100644
---
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
+++
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.NotNull;
@@ -149,7 +150,7 @@ public class AffinityManagerTest {
assertEquals(INTERNAL_PREFIX + tblId, new String(key.bytes(),
StandardCharsets.UTF_8));
- return CompletableFuture.completedFuture(new
org.apache.ignite.internal.vault.common.Entry(key,
STATIC_TABLE_NAME.getBytes(StandardCharsets.UTF_8)));
+ return CompletableFuture.completedFuture(new VaultEntry(key,
STATIC_TABLE_NAME.getBytes(StandardCharsets.UTF_8)));
});
CompletableFuture<WatchListener> watchFut = new CompletableFuture<>();
diff --git
a/modules/api/src/main/java/org/apache/ignite/app/IgnitionManager.java
b/modules/api/src/main/java/org/apache/ignite/app/IgnitionManager.java
index 0ebc6f5..9904189 100644
--- a/modules/api/src/main/java/org/apache/ignite/app/IgnitionManager.java
+++ b/modules/api/src/main/java/org/apache/ignite/app/IgnitionManager.java
@@ -26,6 +26,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class IgnitionManager {
/** Loaded Ignition instance. */
+ @Nullable
private static Ignition ignition;
/**
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index db4855e..e15e310 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -41,6 +41,12 @@
<!-- Test dependencies. -->
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 78dbd7b..46056ff 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -23,11 +23,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.jetbrains.annotations.Nullable;
@@ -39,7 +42,7 @@ public class IgniteUtils {
private static final int MASK = 0xf;
/** Version of the JDK. */
- private static String jdkVer;
+ private static final String jdkVer =
System.getProperty("java.specification.version");
/** Class loader used to load Ignite. */
private static final ClassLoader igniteClassLoader =
IgniteUtils.class.getClassLoader();
@@ -47,7 +50,17 @@ public class IgniteUtils {
private static final boolean assertionsEnabled;
/** Primitive class map. */
- private static final Map<String, Class<?>> primitiveMap = new
HashMap<>(16, .5f);
+ private static final Map<String, Class<?>> primitiveMap = Map.of(
+ "byte", byte.class,
+ "short", short.class,
+ "int", int.class,
+ "long", long.class,
+ "float", float.class,
+ "double", double.class,
+ "char", char.class,
+ "boolean", boolean.class,
+ "void", void.class
+ );
/** */
private static final ConcurrentMap<ClassLoader, ConcurrentMap<String,
Class<?>>> classCache =
@@ -70,18 +83,6 @@ public class IgniteUtils {
finally {
assertionsEnabled = assertionsEnabled0;
}
-
- IgniteUtils.jdkVer = System.getProperty("java.specification.version");
-
- primitiveMap.put("byte", byte.class);
- primitiveMap.put("short", short.class);
- primitiveMap.put("int", int.class);
- primitiveMap.put("long", long.class);
- primitiveMap.put("float", float.class);
- primitiveMap.put("double", double.class);
- primitiveMap.put("char", char.class);
- primitiveMap.put("boolean", boolean.class);
- primitiveMap.put("void", void.class);
}
/**
@@ -350,7 +351,7 @@ public class IgniteUtils {
cls = Class.forName(clsName, true, ldr);
- Class old = ldrMap.putIfAbsent(clsName, cls);
+ Class<?> old = ldrMap.putIfAbsent(clsName, cls);
if (old != null)
cls = old;
@@ -395,4 +396,74 @@ public class IgniteUtils {
public static boolean assertionsEnabled() {
return assertionsEnabled;
}
+
+ /**
+ * Shuts down the given executor service gradually, first disabling new
submissions and later, if
+ * necessary, cancelling remaining tasks.
+ *
+ * <p>The method takes the following steps:
+ *
+ * <ol>
+ * <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of
new submitted tasks.
+ * <li>awaits executor service termination for half of the specified
timeout.
+ * <li>if the timeout expires, it calls {@link
ExecutorService#shutdownNow()}, cancelling
+ * pending tasks and interrupting running tasks.
+ * <li>awaits executor service termination for the other half of the
specified timeout.
+ * </ol>
+ *
+ * <p>If, at any step of the process, the calling thread is interrupted,
the method calls {@link
+ * ExecutorService#shutdownNow()} and returns.
+ *
+ * @param service the {@code ExecutorService} to shut down
+ * @param timeout the maximum time to wait for the {@code ExecutorService}
to terminate
+ * @param unit the time unit of the timeout argument
+ */
+ public static void shutdownAndAwaitTermination(ExecutorService service,
long timeout, TimeUnit unit) {
+ long halfTimeoutNanos = unit.toNanos(timeout) / 2;
+
+ // Disable new tasks from being submitted
+ service.shutdown();
+
+ try {
+ // Wait for half the duration of the timeout for existing tasks to
terminate
+ if (!service.awaitTermination(halfTimeoutNanos,
TimeUnit.NANOSECONDS)) {
+ // Cancel currently executing tasks
+ service.shutdownNow();
+ // Wait the other half of the timeout for tasks to respond to
being cancelled
+ service.awaitTermination(halfTimeoutNanos,
TimeUnit.NANOSECONDS);
+ }
+ }
+ catch (InterruptedException ie) {
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ // (Re-)Cancel if current thread also interrupted
+ service.shutdownNow();
+ }
+ }
+
+ /**
+ * Closes all provided objects. If any of the {@link AutoCloseable#close}
methods throw an exception, only the first
+ * thrown exception will be propagated to the caller, after all other
objects are closed, similar to
+ * the try-with-resources block.
+ *
+ * @param closeables collection of objects to close
+ */
+ public static void closeAll(Collection<? extends AutoCloseable>
closeables) throws Exception {
+ Exception ex = null;
+
+ for (AutoCloseable closeable : closeables) {
+ try {
+ closeable.close();
+ }
+ catch (Exception e) {
+ if (ex == null)
+ ex = e;
+ else
+ ex.addSuppressed(e);
+ }
+ }
+
+ if (ex != null)
+ throw ex;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
index a5552f2..fbe1782 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
@@ -19,7 +19,6 @@ package org.apache.ignite.lang;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import org.jetbrains.annotations.NotNull;
/**
* A class wraps {@code byte[]} which provides {@link Object#equals}, {@link
Object#hashCode} and
@@ -27,7 +26,6 @@ import org.jetbrains.annotations.NotNull;
*/
public final class ByteArray implements Comparable<ByteArray> {
/** Wrapped byte array. */
- @NotNull
private final byte[] arr;
/**
@@ -37,7 +35,7 @@ public final class ByteArray implements Comparable<ByteArray>
{
*
* @param arr Byte array. Can't be {@code null}.
*/
- public ByteArray(@NotNull byte[] arr) {
+ public ByteArray(byte[] arr) {
this.arr = arr;
}
@@ -46,19 +44,28 @@ public final class ByteArray implements
Comparable<ByteArray> {
*
* @param s The string key representation. Can't be {@code null}.
*/
- public ByteArray(@NotNull String s) {
+ public ByteArray(String s) {
this(s.getBytes(StandardCharsets.UTF_8));
}
/**
+ * Copy constructor, creates a shallow copy of the given {@code ByteArray}.
+ *
+ * @param other byte array to copy from
+ */
+ public ByteArray(ByteArray other) {
+ arr = other.arr;
+ }
+
+ /**
* Constructs {@code ByteArray} instance from the given string. {@link
StandardCharsets#UTF_8} charset is used for
* encoding the input string.
*
* @param s The string {@code ByteArray} representation. Can't be {@code
null}.
* @return {@code ByteArray} instance from the given string.
*/
- public static ByteArray fromString(@NotNull String s) {
- return new ByteArray(s.getBytes(StandardCharsets.UTF_8));
+ public static ByteArray fromString(String s) {
+ return new ByteArray(s);
}
/**
@@ -87,29 +94,10 @@ public final class ByteArray implements
Comparable<ByteArray> {
}
/** {@inheritDoc} */
- @Override public int compareTo(@NotNull ByteArray other) {
+ @Override public int compareTo(ByteArray other) {
return Arrays.compare(this.arr, other.arr);
}
- /**
- * Compares two {@code ByteArray} values. The value returned is identical
to what would be returned by:
- * <pre>
- * x.compareTo(y)
- * </pre>
- * <p>
- * where x and y are {@code ByteArray}'s
- *
- * @param x The first {@code ByteArray} to compare.
- * @param y The second {@code ByteArray} to compare.
- * @return the value {@code 0} if the first and second {@code ByteArray}
are equal and contain the same elements in
- * the same order; a value less than {@code 0} if the first {@code
ByteArray} is lexicographically less than the
- * second {@code ByteArray}; and a value greater than {@code 0} if the
first {@code ByteArray} is lexicographically
- * greater than the second {@code ByteArray}
- */
- public static int compare(@NotNull ByteArray x, @NotNull ByteArray y) {
- return Arrays.compare(x.arr, y.arr);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return new String(arr, StandardCharsets.UTF_8);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
new file mode 100644
index 0000000..8301f1e
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.IOException;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.arrayWithSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test suite for {@link IgniteUtils}.
+ */
+class IgniteUtilsTest {
+ /**
+ * Tests that all resources are closed by the {@link IgniteUtils#closeAll}
even if {@link AutoCloseable#close}
+ * throws an exception.
+ */
+ @Test
+ void testCloseAll() {
+ class TestCloseable implements AutoCloseable {
+ private boolean closed = false;
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ closed = true;
+
+ throw new IOException();
+ }
+ }
+
+ var closeables = List.of(new TestCloseable(), new TestCloseable(), new
TestCloseable());
+
+ Exception e = assertThrows(IOException.class, () ->
IgniteUtils.closeAll(closeables));
+
+ assertThat(e.getSuppressed(), arrayWithSize(2));
+
+ closeables.forEach(c -> assertTrue(c.closed));
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index b535b6c..697b213 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -43,12 +43,11 @@ import
org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
import org.apache.ignite.internal.metastorage.watch.KeyCriterion;
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterNode;
@@ -56,6 +55,9 @@ import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+
/**
* MetaStorage manager is responsible for:
* <ul>
@@ -74,7 +76,7 @@ public class MetaStorageManager {
public static final String DISTRIBUTED_PREFIX = "dst-cfg.";
/**
- * Special key for the vault where the applied revision for {@link
MetaStorageManager#storeEntries(Collection, long)}
+ * Special key for the vault where the applied revision for {@link
MetaStorageManager#storeEntries}
* operation is stored. This mechanism is needed for committing processed
watches to {@link VaultManager}.
*/
public static final ByteArray APPLIED_REV =
ByteArray.fromString(DISTRIBUTED_PREFIX + "applied_revision");
@@ -89,7 +91,7 @@ public class MetaStorageManager {
private final Loza raftMgr;
/** Meta storage service. */
- private CompletableFuture<MetaStorageService> metaStorageSvcFut;
+ private final CompletableFuture<MetaStorageService> metaStorageSvcFut;
/**
* Aggregator of multiple watches to deploy them as one batch.
@@ -117,6 +119,11 @@ public class MetaStorageManager {
private boolean metaStorageNodesOnStart;
/**
+ * Lock for the read-then-update logic in the {@link #storeEntries} method.
+ */
+ private final Object revisionLock = new Object();
+
+ /**
* The constructor.
*
* @param vaultMgr Vault manager.
@@ -174,28 +181,23 @@ public class MetaStorageManager {
* Deploy all registered watches.
*/
public synchronized void deployWatches() {
- try {
- var watch = watchAggregator.watch(
- appliedRevision() + 1,
- this::storeEntries
- );
+ var watch = watchAggregator.watch(
+ appliedRevision() + 1,
+ this::storeEntries
+ );
- if (watch.isEmpty())
- deployFut.complete(Optional.empty());
- else {
- CompletableFuture<Void> fut =
-
dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id ->
deployFut.complete(Optional.of(id)));
+ if (watch.isEmpty())
+ deployFut.complete(Optional.empty());
+ else {
+ CompletableFuture<Void> fut =
+ dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id
-> deployFut.complete(Optional.of(id)));
- if (metaStorageNodesOnStart)
- fut.join();
- else {
- // TODO: need to wait for this future in init phase
https://issues.apache.org/jira/browse/IGNITE-14414
- }
+ if (metaStorageNodesOnStart)
+ fut.join();
+ else {
+ // TODO: need to wait for this future in init phase
https://issues.apache.org/jira/browse/IGNITE-14414
}
}
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException("Couldn't receive applied
revision during deploy watches", e);
- }
deployed = true;
}
@@ -388,8 +390,8 @@ public class MetaStorageManager {
*/
public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable
ByteArray keyTo, long revUpperBound) {
return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo,
revUpperBound))
+ metaStorageSvcFut,
+ metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo,
revUpperBound))
);
}
@@ -409,14 +411,7 @@ public class MetaStorageManager {
public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray
keyFrom, @Nullable ByteArray keyTo) {
return new CursorWrapper<>(
metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> {
- try {
- return svc.range(keyFrom, keyTo, appliedRevision());
- }
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(e);
- }
- })
+ metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo,
appliedRevision()))
);
}
@@ -446,16 +441,10 @@ public class MetaStorageManager {
*/
public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray
keyPrefix) {
var rangeCriterion =
KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+
return new CursorWrapper<>(
metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> {
- try {
- return svc.range(rangeCriterion.from(),
rangeCriterion.to(), appliedRevision());
- }
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(e);
- }
- })
+ metaStorageSvcFut.thenApply(svc ->
svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision()))
);
}
@@ -504,20 +493,12 @@ public class MetaStorageManager {
}
/**
- * @return Applied revision for {@link VaultManager#putAll(Map, ByteArray,
long)} operation.
- * @throws IgniteInternalCheckedException If couldn't get applied revision
from vault.
+ * @return Applied revision for {@link VaultManager#putAll} operation.
*/
- private long appliedRevision() throws IgniteInternalCheckedException {
- byte[] appliedRevision;
+ private long appliedRevision() {
+ byte[] appliedRevision = vaultMgr.get(APPLIED_REV).join().value();
- try {
- appliedRevision = vaultMgr.get(APPLIED_REV).get().value();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalCheckedException("Error occurred when
getting applied revision", e);
- }
-
- return appliedRevision == null ? 0L :
ByteUtils.bytesToLong(appliedRevision);
+ return appliedRevision == null ? 0L : bytesToLong(appliedRevision);
}
/**
@@ -526,27 +507,19 @@ public class MetaStorageManager {
* @return Ignite UUID of new consolidated watch.
*/
private CompletableFuture<Optional<IgniteUuid>> updateWatches() {
- long revision;
- try {
- revision = appliedRevision() + 1;
- }
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException("Couldn't receive applied
revision during watch redeploy", e);
- }
-
- final var finalRevision = revision;
+ long revision = appliedRevision() + 1;
deployFut = deployFut
- .thenCompose(idOpt -> idOpt.map(id ->
metaStorageSvcFut.thenCompose(svc -> svc.stopWatch(id))).
- orElse(CompletableFuture.completedFuture(null))
- .thenCompose(r -> {
- var watch = watchAggregator.watch(finalRevision,
this::storeEntries);
-
- if (watch.isEmpty())
- return CompletableFuture.completedFuture(Optional.empty());
- else
- return
dispatchAppropriateMetaStorageWatch(watch.get()).thenApply(Optional::of);
- }));
+ .thenCompose(idOpt ->
+ idOpt
+ .map(id -> metaStorageSvcFut.thenCompose(svc ->
svc.stopWatch(id)))
+ .orElseGet(() -> CompletableFuture.completedFuture(null))
+ )
+ .thenCompose(r ->
+ watchAggregator.watch(revision, this::storeEntries)
+ .map(watch ->
dispatchAppropriateMetaStorageWatch(watch).thenApply(Optional::of))
+ .orElseGet(() ->
CompletableFuture.completedFuture(Optional.empty()))
+ );
return deployFut;
}
@@ -556,15 +529,27 @@ public class MetaStorageManager {
*
* @param entries to store.
* @param revision associated revision.
- * @return future, which will be completed when store action finished.
*/
- private CompletableFuture<Void>
storeEntries(Collection<IgniteBiTuple<ByteArray, byte[]>> entries, long
revision) {
- try {
- return vaultMgr.putAll(entries.stream().collect(
- Collectors.toMap(IgniteBiTuple::getKey,
IgniteBiTuple::getValue)), APPLIED_REV, revision);
- }
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException("Couldn't put entries with
considered revision.", e);
+ private void storeEntries(Collection<IgniteBiTuple<ByteArray, byte[]>>
entries, long revision) {
+ Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(entries.size() +
1);
+
+ batch.put(APPLIED_REV, longToBytes(revision));
+
+ entries.forEach(e -> batch.put(e.getKey(), e.getValue()));
+
+ synchronized (revisionLock) {
+ byte[] appliedRevisionBytes =
vaultMgr.get(APPLIED_REV).join().value();
+
+ long appliedRevision = appliedRevisionBytes == null ? 0L :
bytesToLong(appliedRevisionBytes);
+
+ if (revision <= appliedRevision) {
+ throw new IgniteInternalException(String.format(
+ "Current revision (%d) must be greater than the revision
in the Vault (%d)",
+ revision, appliedRevision
+ ));
+ }
+
+ vaultMgr.putAll(batch).join();
}
}
@@ -613,12 +598,7 @@ public class MetaStorageManager {
.metastorageNodes()
.value();
- try {
- return hasMetastorage(vaultMgr.name(), metastorageMembers);
- }
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(e);
- }
+ return hasMetastorage(vaultMgr.name().join(), metastorageMembers);
}
// TODO: IGNITE-14691 Temporally solution that should be removed after
implementing reactive watches.
@@ -667,10 +647,12 @@ public class MetaStorageManager {
return it;
}
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
return it.hasNext();
}
+ /** {@inheritDoc} */
@Override public T next() {
return it.next();
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
index 55afa3e..6ca33a2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
@@ -24,8 +24,10 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.internal.app.IgnitionCleaner;
import org.apache.ignite.internal.schema.SchemaManager;
import
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.schema.ColumnType;
import org.apache.ignite.schema.SchemaBuilders;
@@ -33,6 +35,7 @@ import org.apache.ignite.schema.SchemaTable;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -81,13 +84,22 @@ class DynamicTableCreationTest {
"}");
}};
+ /** */
+ private final List<Ignite> clusterNodes = new ArrayList<>();
+
+ /** */
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(clusterNodes);
+
+ IgnitionCleaner.removeAllData();
+ }
+
/**
* Check dynamic table creation.
*/
@Test
void testDynamicSimpleTableCreation() {
- List<Ignite> clusterNodes = new ArrayList<>();
-
for (Map.Entry<String, String> nodeBootstrapCfg :
nodesBootstrapCfg.entrySet())
clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(),
nodeBootstrapCfg.getValue()));
@@ -135,8 +147,6 @@ class DynamicTableCreationTest {
*/
@Test
void testDynamicTableCreation() {
- List<Ignite> clusterNodes = new ArrayList<>();
-
for (Map.Entry<String, String> nodeBootstrapCfg :
nodesBootstrapCfg.entrySet())
clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(),
nodeBootstrapCfg.getValue()));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
index 113ada5..5c9b57a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
@@ -23,6 +23,9 @@ import java.util.List;
import java.util.Map;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.internal.app.IgnitionCleaner;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -63,13 +66,22 @@ class IgnitionTest {
"}");
}};
+ /** */
+ private final List<Ignite> startedNodes = new ArrayList<>();
+
+ /** */
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(startedNodes);
+
+ IgnitionCleaner.removeAllData();
+ }
+
/**
* Check that Ignition.start() with bootstrap configuration returns Ignite
instance.
*/
@Test
void testNodesStartWithBootstrapConfiguration() {
- List<Ignite> startedNodes = new ArrayList<>();
-
for (Map.Entry<String, String> nodeBootstrapCfg :
nodesBootstrapCfg.entrySet())
startedNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(),
nodeBootstrapCfg.getValue()));
@@ -82,9 +94,9 @@ class IgnitionTest {
* Check that Ignition.start() with bootstrap configuration returns Ignite
instance.
*/
@Test
- void testNodeStartWithoutBootstrapConfiguration() {
- Ignite ignite = IgnitionManager.start("node0", null);
-
- Assertions.assertNotNull(ignite);
+ void testNodeStartWithoutBootstrapConfiguration() throws Exception {
+ try (Ignite ignite = IgnitionManager.start("node0", null)) {
+ Assertions.assertNotNull(ignite);
+ }
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
index 90a5a39..8c92465 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
@@ -24,9 +24,12 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.internal.app.IgnitionCleaner;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -148,13 +151,22 @@ class TableCreationTest {
"}");
}};
+ /** */
+ private final List<Ignite> clusterNodes = new ArrayList<>();
+
+ /** */
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(clusterNodes);
+
+ IgnitionCleaner.removeAllData();
+ }
+
/**
* Check table creation via bootstrap configuration with pre-configured
table.
*/
@Test
void testInitialSimpleTableConfiguration() {
- List<Ignite> clusterNodes = new ArrayList<>();
-
for (Map.Entry<String, String> nodeBootstrapCfg :
nodesBootstrapCfg.entrySet())
clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(),
nodeBootstrapCfg.getValue()));
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 7ada28b..42abedd 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.app;
import org.apache.ignite.app.Ignite;
+import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.table.manager.IgniteTables;
/**
@@ -25,22 +26,27 @@ import org.apache.ignite.table.manager.IgniteTables;
*/
public class IgniteImpl implements Ignite {
/** Distributed table manager. */
- private final IgniteTables distributedTblMgr;
+ private final IgniteTables distributedTableManager;
+
+ /** Vault manager */
+ private final VaultManager vaultManager;
/**
- * @param TblMgr Table manager.
+ * @param tableManager Table manager.
+ * @param vaultManager Vault manager.
*/
- IgniteImpl(IgniteTables TblMgr) {
- this.distributedTblMgr = TblMgr;
+ IgniteImpl(IgniteTables tableManager, VaultManager vaultManager) {
+ this.distributedTableManager = tableManager;
+ this.vaultManager = vaultManager;
}
/** {@inheritDoc} */
@Override public IgniteTables tables() {
- return distributedTblMgr;
+ return distributedTableManager;
}
/** {@inheritDoc} */
- @Override public void close() {
- // TODO IGNITE-14581 Implement IgniteImpl close method.
+ @Override public void close() throws Exception {
+ vaultManager.close();
}
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index f966c6b..802ae1d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.app;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-import io.netty.util.internal.StringUtil;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
import org.apache.ignite.configuration.RootKey;
@@ -43,7 +46,9 @@ import
org.apache.ignite.internal.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.storage.LocalConfigurationStorage;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterLocalConfiguration;
@@ -63,6 +68,11 @@ public class IgnitionImpl implements Ignition {
/** The logger. */
private static final IgniteLogger LOG =
IgniteLogger.forClass(IgnitionImpl.class);
+ /**
+ * Path to the persistent storage used by the {@link VaultService}
component.
+ */
+ static final Path VAULT_DB_PATH = Paths.get("vault");
+
/** */
private static final String[] BANNER = {
"",
@@ -85,14 +95,11 @@ public class IgnitionImpl implements Ignition {
/** {@inheritDoc} */
@Override public synchronized Ignite start(@NotNull String nodeName,
@Nullable String jsonStrBootstrapCfg) {
- assert !StringUtil.isNullOrEmpty(nodeName) : "Node local name is
empty";
+ assert nodeName != null && !nodeName.isBlank() : "Node local name is
empty";
ackBanner();
- // Vault Component startup.
- VaultManager vaultMgr = new VaultManager(new VaultServiceImpl());
-
- vaultMgr.putName(nodeName).join();
+ VaultManager vaultMgr = createVault(nodeName);
boolean cfgBootstrappedFromPds = vaultMgr.bootstrapped();
@@ -114,10 +121,10 @@ public class IgnitionImpl implements Ignition {
locConfigurationMgr.bootstrap(jsonStrBootstrapCfg,
ConfigurationType.LOCAL);
}
catch (Exception e) {
- LOG.warn("Unable to parse user specific configuration, default
configuration will be used: {}", e.getMessage());
+ LOG.warn("Unable to parse user-specific configuration, default
configuration will be used: {}", e.getMessage());
}
else if (jsonStrBootstrapCfg != null)
- LOG.warn("User specific configuration will be ignored, cause vault
was bootstrapped with pds configuration");
+ LOG.warn("User-specific configuration will be ignored, because
vault has been bootstrapped with PDS configuration");
else
locConfigurationMgr.configurationRegistry().startStorageConfigurations(ConfigurationType.LOCAL);
@@ -184,7 +191,27 @@ public class IgnitionImpl implements Ignition {
ackSuccessStart();
- return new IgniteImpl(distributedTblMgr);
+ return new IgniteImpl(distributedTblMgr, vaultMgr);
+ }
+
+ /**
+ * Starts the Vault component.
+ */
+ private static VaultManager createVault(String nodeName) {
+ Path vaultPath = VAULT_DB_PATH.resolve(nodeName);
+
+ try {
+ Files.createDirectories(vaultPath);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ var vaultMgr = new VaultManager(new PersistentVaultService(vaultPath));
+
+ vaultMgr.putName(nodeName).join();
+
+ return vaultMgr;
}
/** */
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
index ffc4d03..f59534a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.NotNull;
@@ -96,7 +97,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
@Override public synchronized Data readAll() throws StorageException {
HashMap<String, Serializable> data = new HashMap<>();
- Iterator<org.apache.ignite.internal.vault.common.Entry> entries =
storedDistributedConfigKeys();
+ Iterator<VaultEntry> entries = storedDistributedConfigKeys();
long appliedRevision = 0L;
@@ -246,7 +247,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
*
* @return Iterator built upon all distributed configuration entries
stored in vault.
*/
- private @NotNull Iterator<org.apache.ignite.internal.vault.common.Entry>
storedDistributedConfigKeys() {
+ private @NotNull Iterator<VaultEntry> storedDistributedConfigKeys() {
// TODO: rangeWithAppliedRevision could throw
OperationTimeoutException and CompactedException and we should
// TODO: properly handle such cases
https://issues.apache.org/jira/browse/IGNITE-14604
return vaultMgr.range(MASTER_KEY, DST_KEYS_END_RANGE);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
index e6e4f7e..1ac3d93 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.configuration.storage.Data;
import org.apache.ignite.internal.configuration.storage.StorageException;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.NotNull;
@@ -71,13 +71,13 @@ public class LocalConfigurationStorage implements
ConfigurationStorage {
/** {@inheritDoc} */
@Override public synchronized Data readAll() throws StorageException {
- Iterator<Entry> iter =
+ Iterator<VaultEntry> iter =
vaultMgr.range(LOC_KEYS_START_RANGE, LOC_KEYS_END_RANGE);
HashMap<String, Serializable> data = new HashMap<>();
while (iter.hasNext()) {
- Entry val = iter.next();
+ VaultEntry val = iter.next();
data.put(val.key().toString().substring(LOC_KEYS_START_RANGE.toString().length()),
(Serializable)ByteUtils.fromBytes(val.value()));
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/test/java/org/apache/ignite/internal/app/IgnitionCleaner.java
similarity index 58%
copy from
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
copy to
modules/runner/src/test/java/org/apache/ignite/internal/app/IgnitionCleaner.java
index 7ada28b..75ca2a0 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/app/IgnitionCleaner.java
@@ -17,30 +17,16 @@
package org.apache.ignite.internal.app;
-import org.apache.ignite.app.Ignite;
-import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.internal.util.IgniteUtils;
/**
- * Ignite internal implementation.
+ * Class for removing data of an Ignite node.
*/
-public class IgniteImpl implements Ignite {
- /** Distributed table manager. */
- private final IgniteTables distributedTblMgr;
-
+public class IgnitionCleaner {
/**
- * @param TblMgr Table manager.
+ * Removes all directories that were created during a node startup.
*/
- IgniteImpl(IgniteTables TblMgr) {
- this.distributedTblMgr = TblMgr;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTables tables() {
- return distributedTblMgr;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- // TODO IGNITE-14581 Implement IgniteImpl close method.
+ public static void removeAllData() {
+ IgniteUtils.delete(IgnitionImpl.VAULT_DB_PATH);
}
}
diff --git a/modules/vault/pom.xml b/modules/vault/pom.xml
index f290fef..3b2447d 100644
--- a/modules/vault/pom.xml
+++ b/modules/vault/pom.xml
@@ -43,8 +43,19 @@
<artifactId>ignite-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git
a/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
new file mode 100644
index 0000000..2fe9217
--- /dev/null
+++
b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Test suite for testing persistence properties of {@link
PersistentVaultService}.
+ */
+class PersistencePropertiesVaultServiceTest {
+ /** */
+ private static final int TIMEOUT_SECONDS = 1;
+
+ /** */
+ private Path baseDir;
+
+ /** */
+ private Path vaultDir;
+
+ /** */
+ @BeforeEach
+ void setUp(TestInfo testInfo) throws IOException {
+ baseDir = testInfo.getTestMethod()
+ .map(Method::getName)
+ .map(Paths::get)
+ .orElseThrow();
+
+ vaultDir = baseDir.resolve("vault");
+
+ Files.createDirectories(vaultDir);
+ }
+
+ /** */
+ @AfterEach
+ void tearDown() {
+ IgniteUtils.delete(baseDir);
+ }
+
+ /**
+ * Tests that the Vault Service correctly persists data after multiple
service restarts.
+ */
+ @Test
+ void testPersistentRestart() throws Exception {
+ var data = Map.of(
+ new ByteArray("key" + 1), fromString("value" + 1),
+ new ByteArray("key" + 2), fromString("value" + 2),
+ new ByteArray("key" + 3), fromString("value" + 3)
+ );
+
+ try (var vaultService = new PersistentVaultService(vaultDir)) {
+ vaultService.putAll(data).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ try (var vaultService = new PersistentVaultService(vaultDir)) {
+ assertThat(
+ vaultService.get(new ByteArray("key" + 1)),
+ willBe(equalTo(new VaultEntry(new ByteArray("key" + 1),
fromString("value" + 1))))
+ );
+ }
+
+ try (
+ var vaultService = new PersistentVaultService(vaultDir);
+ var cursor = vaultService.range(new ByteArray("key" + 1), new
ByteArray("key" + 4))
+ ) {
+ var actualData = new ArrayList<VaultEntry>();
+
+ cursor.forEachRemaining(actualData::add);
+
+ List<VaultEntry> expectedData = data.entrySet().stream()
+ .map(e -> new VaultEntry(e.getKey(), e.getValue()))
+ .sorted(Comparator.comparing(VaultEntry::key))
+ .collect(Collectors.toList());
+
+ assertThat(actualData, is(expectedData));
+ }
+ }
+
+ /**
+ * Converts a {@code String} into a byte array.
+ */
+ private static byte[] fromString(String str) {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+}
diff --git
a/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistentVaultServiceTest.java
b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistentVaultServiceTest.java
new file mode 100644
index 0000000..eb0dbfb
--- /dev/null
+++
b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistentVaultServiceTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.internal.vault.VaultServiceTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Test suite for the {@link PersistentVaultService}.
+ */
+class PersistentVaultServiceTest extends VaultServiceTest {
+ /** */
+ private Path baseDir;
+
+ /** */
+ private Path vaultDir;
+
+ /** */
+ @BeforeEach
+ @Override public void setUp(TestInfo testInfo) throws IOException {
+ baseDir = testInfo.getTestMethod()
+ .map(Method::getName)
+ .map(Paths::get)
+ .orElseThrow();
+
+ vaultDir = baseDir.resolve("vault");
+
+ Files.createDirectories(vaultDir);
+
+ super.setUp(testInfo);
+ }
+
+ /** {@inheritDoc} */
+ @AfterEach
+ @Override public void tearDown() throws Exception {
+ super.tearDown();
+
+ IgniteUtils.delete(baseDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected VaultService getVaultService() {
+ return new PersistentVaultService(vaultDir);
+ }
+}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultEntry.java
similarity index 62%
rename from
modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
rename to
modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultEntry.java
index 3d4566e..3872300 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultEntry.java
@@ -15,34 +15,35 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.vault.common;
+package org.apache.ignite.internal.vault;
+import java.util.Arrays;
+import java.util.Objects;
import org.apache.ignite.lang.ByteArray;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Represents a vault unit as entry with key and value, where
* <ul>
- * <li>key - an unique entry's key. Keys are comparable in lexicographic
manner and represented as an {@link ByteArray}.</li>
+ * <li>key - an unique entry's key. Keys are comparable in lexicographic
manner and represented as an
+ * array of bytes;</li>
* <li>value - a data which is associated with a key and represented as an
array of bytes.</li>
* </ul>
*/
-// TODO: need to generify with metastorage Entry
https://issues.apache.org/jira/browse/IGNITE-14653
-public final class Entry {
+public final class VaultEntry {
/** Key. */
private final ByteArray key;
/** Value. */
- private final byte[] val;
+ private final byte @Nullable [] val;
/**
* Constructs {@code VaultEntry} instance from the given key and value.
*
- * @param key Key as a {@code ByteArray}. Couldn't be null.
+ * @param key Key as a {@code ByteArray}. Cannot be null.
* @param val Value as a {@code byte[]}.
*/
- public Entry(@NotNull ByteArray key, byte[] val) {
+ public VaultEntry(ByteArray key, byte @Nullable [] val) {
this.key = key;
this.val = val;
}
@@ -52,25 +53,42 @@ public final class Entry {
*
* @return The {@code ByteArray}.
*/
- @NotNull public ByteArray key() {
+ public ByteArray key() {
return key;
}
/**
- * Returns a value. Could be {@code null} for empty entry.
+ * Returns a value. Can be {@code null} if the entry is empty.
*
* @return Value.
*/
- @Nullable public byte[] value() {
+ public byte @Nullable [] value() {
return val;
}
/**
- * Returns value which denotes whether entry is empty or not.
+ * Returns value which denotes whether this entry is empty or not.
*
* @return {@code True} if entry is empty, otherwise - {@code false}.
*/
public boolean empty() {
return val == null;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ VaultEntry entry = (VaultEntry)o;
+ return key.equals(entry.key) && Arrays.equals(val, entry.val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = Objects.hash(key);
+ result = 31 * result + Arrays.hashCode(val);
+ return result;
+ }
}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index fbeb7cc..6092608 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -18,16 +18,10 @@
package org.apache.ignite.internal.vault;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.vault.common.Entry;
-import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -35,15 +29,12 @@ import org.jetbrains.annotations.Nullable;
* VaultManager is responsible for handling {@link VaultService} lifecycle
* and providing interface for managing local keys.
*/
-public class VaultManager {
+public class VaultManager implements AutoCloseable {
/** Special key, which reserved for storing the name of the current node.
*/
- private static final ByteArray NODE_NAME =
ByteArray.fromString("node_name");
-
- /** Mutex. */
- private final Object mux = new Object();
+ private static final ByteArray NODE_NAME = new ByteArray("node_name");
/** Instance of vault */
- private VaultService vaultSvc;
+ private final VaultService vaultSvc;
/** Default constructor.
*
@@ -57,52 +48,52 @@ public class VaultManager {
* @return {@code true} if VaultService beneath given VaultManager was
bootstrapped with data
* either from PDS or from user initial bootstrap configuration.
*
- * TODO: implement when IGNITE-14408 will be ready
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-14956
*/
public boolean bootstrapped() {
return false;
}
/**
- * See {@link VaultService#get(ByteArray)}
+ * See {@link VaultService#get}
*
* @param key Key. Couldn't be {@code null}.
* @return An entry for the given key. Couldn't be {@code null}. If there
is no mapping for the provided {@code key},
* then {@code Entry} with value that equals to {@code null} will be
returned.
*/
- @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
+ public CompletableFuture<VaultEntry> get(@NotNull ByteArray key) {
return vaultSvc.get(key);
}
/**
- * See {@link VaultService#put(ByteArray, byte[])}
+ * See {@link VaultService#put}
*
* @param key Vault key. Couldn't be {@code null}.
* @param val Value. If value is equal to {@code null}, then previous
value with key will be deleted if there was any mapping.
* @return Future representing pending completion of the operation.
Couldn't be {@code null}.
*/
- @NotNull public CompletableFuture<Void> put(@NotNull ByteArray key,
@NotNull byte[] val) {
+ public CompletableFuture<Void> put(@NotNull ByteArray key, byte @Nullable
[] val) {
return vaultSvc.put(key, val);
}
/**
- * See {@link VaultService#remove(ByteArray)}
+ * See {@link VaultService#remove}
*
* @param key Vault key. Couldn't be {@code null}.
* @return Future representing pending completion of the operation.
Couldn't be {@code null}.
*/
- @NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
+ public CompletableFuture<Void> remove(@NotNull ByteArray key) {
return vaultSvc.remove(key);
}
/**
- * See {@link VaultService#range(ByteArray, ByteArray)}
+ * See {@link VaultService#range}
*
* @param fromKey Start key of range (inclusive). Couldn't be {@code null}.
* @param toKey End key of range (exclusive). Could be {@code null}.
* @return Iterator built upon entries corresponding to the given range.
*/
- @NotNull public Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull
ByteArray toKey) {
+ public Cursor<VaultEntry> range(@NotNull ByteArray fromKey, @NotNull
ByteArray toKey) {
return vaultSvc.range(fromKey, toKey);
}
@@ -113,73 +104,35 @@ public class VaultManager {
* @param vals The map of keys and corresponding values. Couldn't be
{@code null} or empty.
* @return Future representing pending completion of the operation.
Couldn't be {@code null}.
*/
- @NotNull public CompletableFuture<Void> putAll(@NotNull Map<ByteArray,
byte[]> vals) {
- synchronized (mux) {
- return vaultSvc.putAll(vals);
- }
- }
-
- /**
- * Inserts or updates entries with given keys and given values and
non-negative revision. If the given value in
- * {@code vals} is {@code null}, then corresponding value with key will be
deleted if there was any mapping.
- *
- * @param vals The map of keys and corresponding values. Couldn't be
{@code null} or empty.
- * @param revision Revision for entries. Must be positive.
- * @return Future representing pending completion of the operation.
Couldn't be {@code null}.
- * @throws IgniteInternalCheckedException If revision is inconsistent with
applied revision from vault or if
- * couldn't get applied revision from vault.
- */
- public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]>
vals, ByteArray appliedRevKey, long revision) throws
IgniteInternalCheckedException {
- synchronized (mux) {
- byte[] appliedRevBytes;
-
- try {
- appliedRevBytes = vaultSvc.get(appliedRevKey).get().value();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalCheckedException("Error occurred when
getting applied revision", e);
- }
-
- long appliedRevision = appliedRevBytes != null ?
ByteUtils.bytesToLong(appliedRevBytes) : 0L;
-
- if (revision < appliedRevision)
- throw new IgniteInternalCheckedException("Inconsistency
between applied revision from vault and the current revision");
-
- HashMap<ByteArray, byte[]> mergedMap = new HashMap<>(vals);
-
- mergedMap.put(appliedRevKey, ByteUtils.longToBytes(revision));
-
- return vaultSvc.putAll(mergedMap);
- }
+ public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]>
vals) {
+ return vaultSvc.putAll(vals);
}
/**
* Persist node name to the vault.
*
- * @param name Node name to persist. Couldn't be null.
- * @return Future representing pending completion of the operation.
Couldn't be {@code null}.
+ * @param name node name to persist. Cannot be null.
+ * @return future representing pending completion of the operation.
*/
- @NotNull public CompletableFuture<Void> putName(@NotNull String name) {
+ public CompletableFuture<Void> putName(String name) {
+ if (name.isBlank())
+ throw new IllegalArgumentException("Name must not be empty");
+
return put(NODE_NAME, name.getBytes(StandardCharsets.UTF_8));
}
/**
- * @return Node name, if was stored earlier. Could be {@code null}.
- * @throws IgniteInternalCheckedException If couldn't get node name from
the vault.
+ * @return {@code CompletableFuture} which, when complete, returns the
node name, if was stored earlier,
+ * or {@code null} otherwise.
*/
- @Nullable public String name() throws IgniteInternalCheckedException {
- synchronized (mux) {
- try {
- byte[] nodeName = vaultSvc.get(NODE_NAME).get().value();
+ public CompletableFuture<String> name() {
+ return vaultSvc.get(NODE_NAME)
+ .thenApply(VaultEntry::value)
+ .thenApply(name -> name == null ? null : new String(name,
StandardCharsets.UTF_8));
+ }
- if (nodeName != null)
- return new String(nodeName, StandardCharsets.UTF_8);
- else
- return null;
- }
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalCheckedException("Error occurred when
getting node name", e);
- }
- }
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ vaultSvc.close();
}
}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
similarity index 87%
rename from
modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
rename to
modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
index 831e4f7..37d4b0a 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
@@ -15,20 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.vault.service;
+package org.apache.ignite.internal.vault;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Defines interface for accessing to a vault service.
*/
-// TODO: need to generify with MetastorageService
https://issues.apache.org/jira/browse/IGNITE-14653
-public interface VaultService {
+public interface VaultService extends AutoCloseable {
/**
* Retrieves an entry for the given key.
*
@@ -36,7 +35,7 @@ public interface VaultService {
* @return An entry for the given key. Couldn't be {@code null}. If there
is no mapping for the provided {@code key},
* then {@code Entry} with value that equals to null will be returned.
*/
- @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key);
+ @NotNull CompletableFuture<VaultEntry> get(@NotNull ByteArray key);
/**
* Write value with key to vault. If value is equal to null, then previous
value with key will be deleted if there
@@ -46,7 +45,7 @@ public interface VaultService {
* @param val Value. If value is equal to null, then previous value with
key will be deleted if there was any mapping.
* @return Future representing pending completion of the operation.
Couldn't be {@code null}.
*/
- @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val);
+ @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte
@Nullable [] val);
/**
* Remove value with key from vault.
@@ -63,7 +62,7 @@ public interface VaultService {
* @param toKey End key of range (exclusive). Could be {@code null}.
* @return Iterator built upon entries corresponding to the given range.
*/
- @NotNull Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull
ByteArray toKey);
+ @NotNull Cursor<VaultEntry> range(@NotNull ByteArray fromKey, @NotNull
ByteArray toKey);
/**
* Inserts or updates entries with given keys and given values. If the
given value in {@code vals} is null,
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
deleted file mode 100644
index f5be5cb..0000000
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.vault.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.vault.common.Entry;
-import org.apache.ignite.internal.vault.service.VaultService;
-import org.apache.ignite.lang.ByteArray;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Simple in-memory representation of vault. Only for test purposes.
- */
-public class VaultServiceImpl implements VaultService {
- /** Map to store values. */
- private final NavigableMap<ByteArray, byte[]> storage;
-
- /** Mutex. */
- private final Object mux = new Object();
-
- /**
- * Default constructor.
- */
- public VaultServiceImpl() {
- this.storage = new TreeMap<>();
- }
-
- /** {@inheritDoc} */
- @Override @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray
key) {
- synchronized (mux) {
- return CompletableFuture.completedFuture(new Entry(key,
storage.get(key)));
- }
- }
-
- /** {@inheritDoc} */
- @Override @NotNull public CompletableFuture<Void> put(@NotNull ByteArray
key, @NotNull byte[] val) {
- synchronized (mux) {
- storage.put(key, val);
-
- return CompletableFuture.allOf();
- }
- }
-
- /** {@inheritDoc} */
- @Override @NotNull public CompletableFuture<Void> remove(@NotNull
ByteArray key) {
- synchronized (mux) {
- storage.remove(key);
-
- return CompletableFuture.allOf();
- }
- }
-
- /** {@inheritDoc} */
- //TODO: use Cursor instead of Iterator
https://issues.apache.org/jira/browse/IGNITE-14654
- @Override @NotNull public Iterator<Entry> range(@NotNull ByteArray
fromKey, @NotNull ByteArray toKey) {
- synchronized (mux) {
- ArrayList<Entry> res = new ArrayList<>();
-
- for (Map.Entry<ByteArray, byte[]> e : storage.subMap(fromKey,
toKey).entrySet())
- res.add(new Entry(new ByteArray(e.getKey().bytes()),
e.getValue().clone()));
-
- return res.iterator();
- }
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> putAll(@NotNull
Map<ByteArray, byte[]> vals) {
- synchronized (mux) {
- vals.forEach((k, v) -> {
- if (v == null)
- storage.remove(k);
- });
-
- storage.putAll(vals.entrySet()
- .stream()
- .filter(e -> e.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
-
- return CompletableFuture.allOf();
- }
- }
-}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
new file mode 100644
index 0000000..df976a4
--- /dev/null
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.inmemory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Simple in-memory representation of the Vault Service.
+ */
+public class InMemoryVaultService implements VaultService {
+ /** Map to store values. */
+ private final NavigableMap<ByteArray, byte[]> storage = new TreeMap<>();
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** {@inheritDoc} */
+ @Override @NotNull public CompletableFuture<VaultEntry> get(@NotNull
ByteArray key) {
+ synchronized (mux) {
+ return CompletableFuture.completedFuture(new VaultEntry(key,
storage.get(key)));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public CompletableFuture<Void> put(@NotNull ByteArray
key, byte @Nullable [] val) {
+ synchronized (mux) {
+ storage.put(key, val);
+
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public CompletableFuture<Void> remove(@NotNull
ByteArray key) {
+ synchronized (mux) {
+ storage.remove(key);
+
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public Cursor<VaultEntry> range(@NotNull ByteArray
fromKey, @NotNull ByteArray toKey) {
+ Iterator<VaultEntry> it;
+
+ if (fromKey.compareTo(toKey) >= 0)
+ it = Collections.emptyIterator();
+ else {
+ synchronized (mux) {
+ it = storage.subMap(fromKey, toKey).entrySet().stream()
+ .map(e -> new VaultEntry(new ByteArray(e.getKey()),
e.getValue()))
+ .collect(Collectors.toList())
+ .iterator();
+ }
+ }
+
+ return new Cursor<>() {
+ @Override public void close() {
+ }
+
+ @NotNull @Override public Iterator<VaultEntry> iterator() {
+ return this;
+ }
+
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override public VaultEntry next() {
+ return it.next();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> putAll(@NotNull
Map<ByteArray, byte[]> vals) {
+ synchronized (mux) {
+ for (var entry : vals.entrySet()) {
+ if (entry.getValue() == null)
+ storage.remove(entry.getKey());
+ else
+ storage.put(entry.getKey(), entry.getValue());
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ }
+}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
new file mode 100644
index 0000000..794f8e7
--- /dev/null
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompactionPriority;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Vault Service implementation based on <a
href="https://github.com/facebook/rocksdb">RocksDB</a>.
+ */
+public class PersistentVaultService implements VaultService {
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /** */
+ private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
+
+ /** */
+ private final Options options = new Options();
+
+ /** */
+ private final RocksDB db;
+
+ /**
+ * Creates and starts the RocksDB instance using the recommended options
on the given {@code path}.
+ *
+ * @param path base path for RocksDB
+ */
+ public PersistentVaultService(Path path) {
+ // using the recommended options from
https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
+ options
+ .setCreateIfMissing(true)
+ .setCompressionType(CompressionType.LZ4_COMPRESSION)
+ .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setBytesPerSync(1048576)
+ .setCompactionPriority(CompactionPriority.MinOverlappingRatio)
+ .setTableFormatConfig(
+ new BlockBasedTableConfig()
+ .setBlockSize(16 * 1024)
+ .setCacheIndexAndFilterBlocks(true)
+ .setPinL0FilterAndIndexBlocksInCache(true)
+ .setFormatVersion(5)
+ .setFilterPolicy(new BloomFilter(10, false))
+ .setOptimizeFiltersForMemory(true)
+ );
+
+ try {
+ db = RocksDB.open(options, path.toString());
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws RocksDBException {
+ try (options; db) {
+ db.syncWal();
+
+ IgniteUtils.shutdownAndAwaitTermination(threadPool, 10,
TimeUnit.SECONDS);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @NotNull CompletableFuture<VaultEntry> get(@NotNull ByteArray key) {
+ return supplyAsync(() -> db.get(key.bytes()))
+ .thenApply(v -> new VaultEntry(key, v));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte
@Nullable [] val) {
+ return val == null ? remove(key) : runAsync(() -> db.put(key.bytes(),
val));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
+ return runAsync(() -> db.delete(key.bytes()));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @NotNull Cursor<VaultEntry> range(@NotNull ByteArray fromKey,
@NotNull ByteArray toKey) {
+ try (var readOpts = new ReadOptions()) {
+ var lowerBound = new Slice(fromKey.bytes());
+ var upperBound = new Slice(toKey.bytes());
+
+ readOpts
+ .setIterateLowerBound(lowerBound)
+ .setIterateUpperBound(upperBound);
+
+ RocksIterator it = db.newIterator(readOpts);
+ it.seekToFirst();
+
+ return new RocksIteratorAdapter(it, lowerBound, upperBound);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray,
byte[]> vals) {
+ return runAsync(() -> {
+ try (
+ var writeBatch = new WriteBatch();
+ var writeOpts = new WriteOptions()
+ ) {
+ for (var entry : vals.entrySet()) {
+ if (entry.getValue() == null)
+ writeBatch.delete(entry.getKey().bytes());
+ else
+ writeBatch.put(entry.getKey().bytes(),
entry.getValue());
+ }
+
+ db.write(writeOpts, writeBatch);
+ }
+ });
+ }
+
+ /**
+ * Same as a {@link Supplier} but throws the {@link RocksDBException}.
+ */
+ @FunctionalInterface
+ private static interface RocksSupplier<T> {
+ /** */
+ T supply() throws RocksDBException;
+ }
+
+ /**
+ * Executes the given {@code supplier} on the internal thread pool.
+ */
+ private <T> CompletableFuture<T> supplyAsync(RocksSupplier<T> supplier) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return supplier.supply();
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }, threadPool);
+ }
+
+ /**
+ * Same as a {@link Runnable} but throws the {@link RocksDBException}.
+ */
+ @FunctionalInterface
+ private static interface RocksRunnable {
+ /** */
+ void run() throws RocksDBException;
+ }
+
+ /**
+ * Executes the given {@code runnable} on the internal thread pool.
+ */
+ private CompletableFuture<Void> runAsync(RocksRunnable runnable) {
+ return CompletableFuture.runAsync(() -> {
+ try {
+ runnable.run();
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }, threadPool);
+ }
+}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/RocksIteratorAdapter.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/RocksIteratorAdapter.java
new file mode 100644
index 0000000..d9b849d
--- /dev/null
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/RocksIteratorAdapter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * Adapter from a {@link RocksIterator} to a {@link Cursor}.
+ */
+class RocksIteratorAdapter implements Cursor<VaultEntry> {
+ /** */
+ private final RocksIterator it;
+
+ /**
+ * Lower iteration bound. Needed for resource management.
+ */
+ private final Slice lowerBound;
+
+ /**
+ * Upper iteration bound. Needed for resource management.
+ */
+ private final Slice upperBound;
+
+ /**
+ * @param it RocksDB iterator
+ * @param lowerBound lower iteration bound (included)
+ * @param upperBound upper iteration bound (not included)
+ */
+ RocksIteratorAdapter(RocksIterator it, Slice lowerBound, Slice upperBound)
{
+ this.it = it;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<VaultEntry> iterator() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ IgniteUtils.closeAll(List.of(lowerBound, upperBound, it));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ boolean isValid = it.isValid();
+
+ if (!isValid) {
+ // check the status first. This operation is guaranteed to throw
if an internal error has occurred during
+ // the iteration. Otherwise we've exhausted the data range.
+ try {
+ it.status();
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ return isValid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public VaultEntry next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ var result = new VaultEntry(new ByteArray(it.key()), it.value());
+
+ it.next();
+
+ return result;
+ }
+}
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
new file mode 100644
index 0000000..ecff5bc
--- /dev/null
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * {@link Matcher} that awaits for the given future to complete and then
forwards the result to the nested
+ * {@code matcher}.
+ */
+public class CompletableFutureMatcher<T> extends
TypeSafeMatcher<CompletableFuture<T>> {
+ /** */
+ private static final int TIMEOUT_SECONDS = 1;
+
+ /** */
+ private final Matcher<T> matcher;
+
+ /**
+ * @param matcher matcher to forward the result of the completable future.
+ */
+ private CompletableFutureMatcher(Matcher<T> matcher) {
+ this.matcher = matcher;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean matchesSafely(CompletableFuture<T> item) {
+ try {
+ return matcher.matches(item.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e)
{
+ throw new AssertionError(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void describeTo(Description description) {
+ description.appendText("is ").appendDescriptionOf(matcher);
+ }
+
+ /**
+ * Factory method.
+ *
+ * @param matcher matcher to forward the result of the completable future.
+ */
+ public static <T> CompletableFutureMatcher<T> willBe(Matcher<T> matcher) {
+ return new CompletableFutureMatcher<>(matcher);
+ }
+}
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
new file mode 100644
index 0000000..d8283cd
--- /dev/null
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test suite for the {@link VaultManager}.
+ */
+public class VaultManagerTest {
+ /** */
+ private static final int TIMEOUT_SECONDS = 1;
+
+ /** */
+ private final VaultManager vaultManager = new VaultManager(new
InMemoryVaultService());
+
+ /** */
+ @AfterEach
+ void tearDown() throws Exception {
+ vaultManager.close();
+ }
+
+ /**
+ * Tests the {@link VaultManager#putName} and {@link VaultManager#name}
methods.
+ */
+ @Test
+ void testName() throws Exception {
+ assertThat(vaultManager.name(), willBe(nullValue(String.class)));
+
+ vaultManager.putName("foobar").get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(vaultManager.name(), willBe(equalTo("foobar")));
+
+ vaultManager.putName("foobarbaz").get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+
+ assertThat(vaultManager.name(), willBe(equalTo("foobarbaz")));
+ }
+
+ /**
+ * Tests that {@link VaultManager#putName} does not accept empty strings.
+ */
+ @Test
+ void testEmptyName() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> vaultManager.putName("").get(TIMEOUT_SECONDS,
TimeUnit.SECONDS)
+ );
+ }
+}
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
new file mode 100644
index 0000000..f449280
--- /dev/null
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for testing {@link VaultService} implementations.
+ */
+public abstract class VaultServiceTest {
+ /** */
+ private static final int TIMEOUT_SECONDS = 1;
+
+ /** Vault. */
+ private VaultService vaultService;
+
+ /** */
+ @BeforeEach
+ public void setUp(TestInfo testInfo) throws IOException {
+ vaultService = getVaultService();
+ }
+
+ /** */
+ @AfterEach
+ public void tearDown() throws Exception {
+ vaultService.close();
+ }
+
+ /**
+ * Returns the vault service that will be tested.
+ */
+ protected abstract VaultService getVaultService();
+
+ /**
+ * Tests regular behaviour of the {@link VaultService#put} method.
+ */
+ @Test
+ public void testPut() throws Exception {
+ ByteArray key = getKey(1);
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
null))));
+
+ byte[] val = getValue(1);
+
+ doAwait(() -> vaultService.put(key, val));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+
+ // test idempotency
+ doAwait(() -> vaultService.put(key, val));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+ }
+
+ /**
+ * Tests that the {@link VaultService#put} method removes the given {@code
key} if {@code value} equalTo {@code null}.
+ */
+ @Test
+ public void testPutWithNull() throws Exception {
+ ByteArray key = getKey(1);
+
+ byte[] val = getValue(1);
+
+ doAwait(() -> vaultService.put(key, val));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+
+ doAwait(() -> vaultService.put(key, null));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
null))));
+ }
+
+ /**
+ * Tests regular behaviour of the {@link VaultService#remove} method.
+ */
+ @Test
+ public void testRemove() throws Exception {
+ ByteArray key = getKey(1);
+
+ // Remove non-existent value.
+ doAwait(() -> vaultService.remove(key));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
null))));
+
+ byte[] val = getValue(1);
+
+ doAwait(() -> vaultService.put(key, val));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+
+ // Remove existing value.
+ doAwait(() -> vaultService.remove(key));
+
+ assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
null))));
+ }
+
+ /**
+ * Tests regular behaviour of the {@link VaultService#putAll} method.
+ */
+ @Test
+ public void testPutAll() throws Exception {
+ Map<ByteArray, byte[]> batch = IntStream.range(0, 10)
+ .boxed()
+ .collect(toMap(VaultServiceTest::getKey,
VaultServiceTest::getValue));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ batch.forEach((k, v) -> assertThat(vaultService.get(k),
willBe(equalTo(new VaultEntry(k, v)))));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ batch.forEach((k, v) -> assertThat(vaultService.get(k),
willBe(equalTo(new VaultEntry(k, v)))));
+ }
+
+ /**
+ * Tests that the {@link VaultService#putAll} method will remove keys,
which values are {@code null}.
+ */
+ @Test
+ public void testPutAllWithNull() throws Exception {
+ Map<ByteArray, byte[]> batch = IntStream.range(0, 10)
+ .boxed()
+ .collect(toMap(VaultServiceTest::getKey,
VaultServiceTest::getValue));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ batch.forEach((k, v) -> assertThat(vaultService.get(k),
willBe(equalTo(new VaultEntry(k, v)))));
+
+ Map<ByteArray, byte[]> secondBatch = new HashMap<>();
+
+ secondBatch.put(getKey(4), getValue(3));
+ secondBatch.put(getKey(8), getValue(3));
+ secondBatch.put(getKey(1), null);
+ secondBatch.put(getKey(3), null);
+
+ doAwait(() -> vaultService.putAll(secondBatch));
+
+ assertThat(vaultService.get(getKey(4)), willBe(equalTo(new
VaultEntry(getKey(4), getValue(3)))));
+ assertThat(vaultService.get(getKey(8)), willBe(equalTo(new
VaultEntry(getKey(8), getValue(3)))));
+ assertThat(vaultService.get(getKey(1)), willBe(equalTo(new
VaultEntry(getKey(1), null))));
+ assertThat(vaultService.get(getKey(3)), willBe(equalTo(new
VaultEntry(getKey(3), null))));
+ }
+
+ /**
+ * Tests regular behaviour of the {@link VaultService#range} method.
+ */
+ @Test
+ public void testRange() throws Exception {
+ List<VaultEntry> entries = getRange(0, 10);
+
+ Map<ByteArray, byte[]> batch =
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ List<VaultEntry> range = range(getKey(3), getKey(7));
+
+ assertThat(range, equalTo(getRange(3, 7)));
+ }
+
+ /**
+ * Tests that the {@link VaultService#range} returns valid entries when
passed a larger range, than the available
+ * data.
+ */
+ @Test
+ public void testRangeBoundaries() throws Exception {
+ List<VaultEntry> entries = getRange(3, 5);
+
+ Map<ByteArray, byte[]> batch =
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ List<VaultEntry> range = range(getKey(0), getKey(9));
+
+ assertThat(range, equalTo(entries));
+ }
+
+ /**
+ * Tests that the {@link VaultService#range} upper bound equalTo not
included.
+ */
+ @Test
+ public void testRangeNotIncludedBoundary() throws Exception {
+ List<VaultEntry> entries = getRange(3, 5);
+
+ Map<ByteArray, byte[]> batch =
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ List<VaultEntry> range = range(getKey(3), getKey(4));
+
+ assertThat(range, equalTo(List.of(new VaultEntry(getKey(3),
getValue(3)))));
+ }
+
+ /**
+ * Tests that an empty result equalTo returned when {@link
VaultService#range} contains invalid boundaries.
+ */
+ @Test
+ public void testRangeInvalidBoundaries() throws Exception {
+ Map<ByteArray, byte[]> batch = getRange(3,
5).stream().collect(toMap(VaultEntry::key, VaultEntry::value));
+
+ doAwait(() -> vaultService.putAll(batch));
+
+ List<VaultEntry> range = range(getKey(4), getKey(1));
+
+ assertThat(range, is(empty()));
+
+ range = range(getKey(4), getKey(4));
+
+ assertThat(range, is(empty()));
+ }
+
+ /**
+ * Creates a test key.
+ */
+ private static ByteArray getKey(int k) {
+ return new ByteArray("key" + k);
+ }
+
+ /**
+ * Creates a test value.
+ */
+ private static byte[] getValue(int v) {
+ return ("val" + v).getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Creates a range of test Vault entries.
+ */
+ private List<VaultEntry> getRange(int from, int to) {
+ return IntStream.range(from, to)
+ .mapToObj(i -> new VaultEntry(getKey(i), getValue(i)))
+ .collect(toList());
+ }
+
+ /**
+ * Performs the given action and waits for the returned future to complete.
+ */
+ private static void doAwait(Supplier<CompletableFuture<?>> supplier)
throws Exception {
+ supplier.get().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Exctracts the given range of values from the Vault.
+ */
+ private List<VaultEntry> range(ByteArray from, ByteArray to) throws
Exception {
+ var result = new ArrayList<VaultEntry>();
+
+ try (Cursor<VaultEntry> cursor = vaultService.range(from, to)) {
+ cursor.forEach(result::add);
+ }
+
+ return result;
+ }
+}
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
deleted file mode 100644
index d753e0a..0000000
---
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.vault.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.common.Entry;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-/**
- * Test for base vault manager contracts.
- */
-public class VaultBaseContractsTest {
- /** Vault. */
- private VaultManager vaultMgr;
-
- /**
- * Instantiate vault.
- */
- @BeforeEach
- public void setUp() {
- vaultMgr = new VaultManager(new VaultServiceImpl());
- }
-
- /**
- * put contract
- */
- @Test
- public void put() throws ExecutionException, InterruptedException {
- ByteArray key = getKey(1);
- byte[] val = getValue(key, 1);
-
- assertNull(vaultMgr.get(key).get().value());
-
- vaultMgr.put(key, val);
-
- Entry v = vaultMgr.get(key).get();
-
- assertFalse(v.empty());
- assertEquals(val, v.value());
-
- vaultMgr.put(key, val);
-
- v = vaultMgr.get(key).get();
-
- assertFalse(v.empty());
- assertEquals(val, v.value());
- }
-
- /**
- * remove contract.
- */
- @Test
- public void remove() throws ExecutionException, InterruptedException {
- ByteArray key = getKey(1);
- byte[] val = getValue(key, 1);
-
- assertNull(vaultMgr.get(key).get().value());
-
- // Remove non-existent value.
- vaultMgr.remove(key);
-
- assertNull(vaultMgr.get(key).get().value());
-
- vaultMgr.put(key, val);
-
- Entry v = vaultMgr.get(key).get();
-
- assertFalse(v.empty());
- assertEquals(val, v.value());
-
- // Remove existent value.
- vaultMgr.remove(key);
-
- v = vaultMgr.get(key).get();
-
- assertNull(v.value());
- }
-
- /**
- * range contract.
- */
- @Test
- public void range() throws ExecutionException, InterruptedException {
- ByteArray key;
-
- Map<ByteArray, byte[]> values = new HashMap<>();
-
- for (int i = 0; i < 10; i++) {
- key = getKey(i);
-
- values.put(key, getValue(key, i));
-
- assertNull(vaultMgr.get(key).get().value());
- }
-
- values.forEach((k, v) -> vaultMgr.put(k, v));
-
- for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
- assertEquals(entry.getValue(),
vaultMgr.get(entry.getKey()).get().value());
-
- Iterator<Entry> it = vaultMgr.range(getKey(3), getKey(7));
-
- List<Entry> rangeRes = new ArrayList<>();
-
- it.forEachRemaining(rangeRes::add);
-
- assertEquals(4, rangeRes.size());
-
- //Check that we have exact range from "key3" to "key6"
- for (int i = 3; i < 7; i++)
- assertArrayEquals(values.get(getKey(i)), rangeRes.get(i -
3).value());
- }
-
- /**
- * putAll with applied revision contract.
- */
- @Test
- public void putAllAndRevision() throws ExecutionException,
InterruptedException, IgniteInternalCheckedException {
- Map<ByteArray, byte[]> entries = new HashMap<>();
-
- int entriesNum = 100;
-
- ByteArray appRevKey = ByteArray.fromString("test_applied_revision");
-
- for (int i = 0; i < entriesNum; i++) {
- ByteArray key = getKey(i);
-
- entries.put(key, getValue(key, i));
- }
-
- for (int i = 0; i < entriesNum; i++) {
- ByteArray key = getKey(i);
-
- assertNull(vaultMgr.get(key).get().value());
- }
-
- vaultMgr.putAll(entries, appRevKey, 1L);
-
- for (int i = 0; i < entriesNum; i++) {
- ByteArray key = getKey(i);
-
- assertArrayEquals(entries.get(key),
vaultMgr.get(key).get().value());
- }
-
- assertEquals(1L,
ByteUtils.bytesToLong(vaultMgr.get(appRevKey).get().value()));
- }
-
- /**
- * putAll contract.
- */
- @Test
- public void putAll() throws ExecutionException, InterruptedException {
- Map<ByteArray, byte[]> entries = new HashMap<>();
-
- int entriesNum = 100;
-
- for (int i = 0; i < entriesNum; i++) {
- ByteArray key = getKey(i);
-
- entries.put(key, getValue(key, i));
- }
-
- for (int i = 0; i < entriesNum; i++) {
- ByteArray key = getKey(i);
-
- assertNull(vaultMgr.get(key).get().value());
- }
-
- vaultMgr.putAll(entries);
-
- for (int i = 0; i < entriesNum; i++) {
- ByteArray key = getKey(i);
-
- assertArrayEquals(entries.get(key),
vaultMgr.get(key).get().value());
- }
- }
-
- /**
- * Creates key for vault entry.
- */
- private static ByteArray getKey(int k) {
- return ByteArray.fromString("key" + k);
- }
-
- /**
- * Creates value represented by byte array.
- */
- private static byte[] getValue(ByteArray k, int v) {
- return ("key" + k + '_' + "val" + v).getBytes();
- }
-}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultServiceTest.java
similarity index 56%
copy from
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
copy to
modules/vault/src/test/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultServiceTest.java
index 7ada28b..3e8a695 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultServiceTest.java
@@ -15,32 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.app;
+package org.apache.ignite.internal.vault.inmemory;
-import org.apache.ignite.app.Ignite;
-import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.internal.vault.VaultServiceTest;
/**
- * Ignite internal implementation.
+ * Test suite for the {@link InMemoryVaultService}.
*/
-public class IgniteImpl implements Ignite {
- /** Distributed table manager. */
- private final IgniteTables distributedTblMgr;
-
- /**
- * @param TblMgr Table manager.
- */
- IgniteImpl(IgniteTables TblMgr) {
- this.distributedTblMgr = TblMgr;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTables tables() {
- return distributedTblMgr;
- }
-
+class InMemoryVaultServiceTest extends VaultServiceTest {
/** {@inheritDoc} */
- @Override public void close() {
- // TODO IGNITE-14581 Implement IgniteImpl close method.
+ @Override protected VaultService getVaultService() {
+ return new InMemoryVaultService();
}
}