This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4363510950 IGNITE-21496 Make Vault methods synchronous (#3189)
4363510950 is described below
commit 4363510950a71d6641e31aa83de35fc40434b11e
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri Feb 9 11:54:43 2024 +0200
IGNITE-21496 Make Vault methods synchronous (#3189)
---
.../management/ClusterManagementGroupManager.java | 24 ++--
.../cluster/management/LocalStateStorage.java | 23 ++--
.../internal/cluster/management/MockNode.java | 4 +-
...niteDistributionZoneManagerNodeRestartTest.java | 4 +-
.../internal/network/recovery/VaultStaleIds.java | 6 +-
.../network/recovery/VaultStaleIdsTest.java | 51 +++-----
modules/platforms/cpp/ignite/common/error_codes.h | 0
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 0
.../ItDistributedConfigurationStorageTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
.../storage/LocalConfigurationStorage.java | 27 +++--
.../ignite/internal/BaseIgniteRestartTest.java | 4 +-
.../rebalance/ItRebalanceDistributedTest.java | 6 +-
.../internal/table/distributed/LowWatermark.java | 80 ++++++-------
.../table/distributed/LowWatermarkTest.java | 13 +-
.../table/distributed/TableManagerTest.java | 4 -
.../apache/ignite/internal/vault/VaultManager.java | 34 +++---
.../apache/ignite/internal/vault/VaultService.java | 14 +--
.../vault/persistence/PersistentVaultService.java | 133 +++++++--------------
.../ignite/internal/vault/VaultManagerTest.java | 12 +-
.../ignite/internal/vault/VaultServiceTest.java | 62 +++++-----
.../ItPersistencePropertiesVaultServiceTest.java | 16 +--
.../persistence/ItPersistentVaultServiceTest.java | 2 +-
.../vault/inmemory/InMemoryVaultService.java | 80 +++++--------
25 files changed, 243 insertions(+), 371 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 372ce984fd..0ef44d4dfb 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -247,17 +247,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
*/
@Nullable
private CompletableFuture<CmgRaftService> recoverLocalState() {
- LocalState localState;
-
- try {
- localState = localStateStorage.getLocalState().get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInternalException("Interrupted while retrieving
local CMG state", e);
- } catch (ExecutionException e) {
- throw new IgniteInternalException("Error while retrieving local
CMG state", e);
- }
+ LocalState localState = localStateStorage.getLocalState();
if (localState == null) {
return null;
@@ -336,8 +326,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
.thenCompose(state -> {
var localState = new LocalState(state.cmgNodes(),
state.clusterTag());
- return localStateStorage.saveLocalState(localState)
- .thenCompose(v -> joinCluster(service,
state.clusterTag()));
+ localStateStorage.saveLocalState(localState);
+
+ return joinCluster(service, state.clusterTag());
});
}
@@ -440,7 +431,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
- localStateStorage.clear().get();
+ localStateStorage.clear();
} catch (Exception e) {
throw new IgniteInternalException("Error when cleaning the CMG
state", e);
}
@@ -578,8 +569,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
.thenCompose(service -> {
var localState = new LocalState(state.cmgNodes(),
state.clusterTag());
- return localStateStorage.saveLocalState(localState)
- .thenCompose(v -> joinCluster(service,
state.clusterTag()));
+ localStateStorage.saveLocalState(localState);
+
+ return joinCluster(service, state.clusterTag());
});
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
index 9a6bcd8438..d708e5e038 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
@@ -19,10 +19,11 @@ package org.apache.ignite.internal.cluster.management;
import java.io.Serializable;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
/**
* Class that represents a local CMG state (persisted in the Vault).
@@ -33,6 +34,8 @@ class LocalStateStorage {
private static final ByteArray CMG_STATE_VAULT_KEY =
ByteArray.fromString("cmg_state");
static class LocalState implements Serializable {
+ private static final long serialVersionUID = -5069326157367860480L;
+
private final Set<String> cmgNodeNames;
private final ClusterTag clusterTag;
@@ -62,27 +65,25 @@ class LocalStateStorage {
*
* @return Local state.
*/
- CompletableFuture<LocalState> getLocalState() {
- return vault.get(CMG_STATE_VAULT_KEY)
- .thenApply(entry -> entry == null ? null :
ByteUtils.fromBytes(entry.value()));
+ @Nullable LocalState getLocalState() {
+ VaultEntry entry = vault.get(CMG_STATE_VAULT_KEY);
+
+ return entry == null ? null : ByteUtils.fromBytes(entry.value());
}
/**
* Saves a given local state.
*
* @param state Local state to save.
- * @return Future that represents the state of the operation.
*/
- CompletableFuture<Void> saveLocalState(LocalState state) {
- return vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state));
+ void saveLocalState(LocalState state) {
+ vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state));
}
/**
* Removes all data from the local storage.
- *
- * @return Future that represents the state of the operation.
*/
- CompletableFuture<Void> clear() {
- return vault.remove(CMG_STATE_VAULT_KEY);
+ void clear() {
+ vault.remove(CMG_STATE_VAULT_KEY);
}
}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index d4e3c87261..8db7c05a7f 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -18,8 +18,6 @@
package org.apache.ignite.internal.cluster.management;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -105,7 +103,7 @@ public class MockNode {
private void init(int port) throws IOException {
Path vaultDir = workDir.resolve("vault");
- var vaultManager = new VaultManager(new
PersistentVaultService(testNodeName(testInfo, port),
Files.createDirectories(vaultDir)));
+ var vaultManager = new VaultManager(new
PersistentVaultService(Files.createDirectories(vaultDir)));
this.clusterService = ClusterServiceTestUtils.clusterService(testInfo,
port, nodeFinder);
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 8bc669ef5b..76516e2e88 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -161,7 +161,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
List<IgniteComponent> components = new ArrayList<>();
- VaultManager vault = createVault(name, dir);
+ VaultManager vault = createVault(dir);
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
@@ -261,7 +261,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
// Start.
vault.start();
- vault.putName(name).join();
+ vault.putName(name);
nodeCfgMgr.start();
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
index 2ad20206e4..d02f94b5e7 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
/**
- * {@link StaleIds} implementating using Vault as a persistent storage.
+ * {@link StaleIds} implementation using Vault as a persistent storage.
*/
public class VaultStaleIds implements StaleIds {
private static final ByteArray STALE_IDS_KEY = new
ByteArray("network.staleIds");
@@ -64,7 +64,7 @@ public class VaultStaleIds implements StaleIds {
}
private Set<String> loadStaleIdsFromVault() {
- VaultEntry entry = vaultManager.get(STALE_IDS_KEY).join();
+ VaultEntry entry = vaultManager.get(STALE_IDS_KEY);
if (entry == null) {
return new LinkedHashSet<>();
@@ -99,6 +99,6 @@ public class VaultStaleIds implements StaleIds {
private void saveIdsToVault() {
String joinedIds = String.join("\n", staleIds);
- vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8)).join();
+ vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8));
}
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
index c3c4b9b38c..7154edfeb3 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
@@ -18,19 +18,14 @@
package org.apache.ignite.internal.network.recovery;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.vault.VaultEntry;
@@ -38,6 +33,7 @@ import org.apache.ignite.internal.vault.VaultManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -57,8 +53,7 @@ class VaultStaleIdsTest extends BaseIgniteAbstractTest {
@Test
void consultsVaultWhenCheckingForStaleness() {
- doReturn(completedFuture(new VaultEntry(staleIdsKey,
"id1\nid2\nid3".getBytes(UTF_8))))
- .when(vaultManager).get(staleIdsKey);
+ when(vaultManager.get(staleIdsKey)).thenReturn(new
VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8)));
assertThat(staleIds.isIdStale("id1"), is(true));
assertThat(staleIds.isIdStale("id2"), is(true));
@@ -68,56 +63,44 @@ class VaultStaleIdsTest extends BaseIgniteAbstractTest {
@Test
void cachesVaultStateInMemory() {
- doReturn(completedFuture(new VaultEntry(staleIdsKey,
"id1\nid2\nid3".getBytes(UTF_8))))
- .when(vaultManager).get(staleIdsKey);
+ when(vaultManager.get(staleIdsKey)).thenReturn(new
VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8)));
staleIds.isIdStale("id1");
staleIds.isIdStale("id2");
staleIds.isIdStale("id3");
- verify(vaultManager, times(1)).get(any());
+ verify(vaultManager).get(any());
}
@Test
void savesNewStaleIdsToVault() {
- doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey);
- doReturn(nullCompletedFuture())
- .when(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8));
- doReturn(nullCompletedFuture())
- .when(vaultManager).put(staleIdsKey,
"id2\nid1".getBytes(UTF_8));
-
staleIds.markAsStale("id2");
+
+ verify(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8));
+
staleIds.markAsStale("id1");
+
+ verify(vaultManager).put(staleIdsKey, "id2\nid1".getBytes(UTF_8));
}
@Test
void respectsMaxIdsLimit() {
staleIds = new VaultStaleIds(vaultManager, 2);
- doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey);
-
- AtomicReference<String> lastSavedIds = new AtomicReference<>();
-
- doAnswer(invocation -> {
- byte[] value = invocation.getArgument(1);
-
- lastSavedIds.set(new String(value, UTF_8));
-
- return nullCompletedFuture();
- }).when(vaultManager).put(eq(staleIdsKey), any());
-
staleIds.markAsStale("id3");
staleIds.markAsStale("id2");
staleIds.markAsStale("id1");
- assertThat(lastSavedIds.get(), is("id2\nid1"));
+ ArgumentCaptor<byte[]> idsCaptor =
ArgumentCaptor.forClass(byte[].class);
+
+ verify(vaultManager, times(3)).put(eq(staleIdsKey),
idsCaptor.capture());
+
+ assertThat(idsCaptor.getValue(), is("id2\nid1".getBytes(UTF_8)));
}
@Test
void loadsBeforeDoingFirstSave() {
- lenient().doReturn(completedFuture(new VaultEntry(staleIdsKey,
"id1".getBytes(UTF_8))))
- .when(vaultManager).get(staleIdsKey);
-
doReturn(nullCompletedFuture()).when(vaultManager).put(eq(staleIdsKey), any());
+ when(vaultManager.get(staleIdsKey)).thenReturn(new
VaultEntry(staleIdsKey, "id1".getBytes(UTF_8)));
staleIds.markAsStale("id2");
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
old mode 100755
new mode 100644
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
old mode 100755
new mode 100644
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 8e2258f211..d0de55fb56 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.configuration.storage;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -114,7 +113,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
Node(TestInfo testInfo, Path workDir) {
var addr = new NetworkAddress("localhost", 10000);
- vaultManager = new VaultManager(new
PersistentVaultService(testNodeName(testInfo, addr.port()),
workDir.resolve("vault")));
+ vaultManager = new VaultManager(new
PersistentVaultService(workDir.resolve("vault")));
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 13a685d976..5dfd681e2b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -268,7 +268,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
List<IgniteComponent> components = new ArrayList<>();
- VaultManager vault = createVault(name, dir);
+ VaultManager vault = createVault(dir);
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
@@ -561,7 +561,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
// Start.
vault.start();
- vault.putName(name).join();
+ vault.putName(name);
nodeCfgMgr.start();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2f76950b2f..8aac39c636 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -367,7 +367,7 @@ public class IgniteImpl implements Ignite {
threadPoolsManager = new ThreadPoolsManager(name);
- vaultMgr = createVault(name, workDir);
+ vaultMgr = createVault(workDir);
metricManager = new MetricManager();
@@ -879,7 +879,7 @@ public class IgniteImpl implements Ignite {
lifecycleManager.startComponent(vaultMgr);
- vaultMgr.putName(name).get();
+ vaultMgr.putName(name);
// Node configuration manager startup.
lifecycleManager.startComponent(nodeCfgMgr);
@@ -1215,7 +1215,7 @@ public class IgniteImpl implements Ignite {
/**
* Starts the Vault component.
*/
- private static VaultManager createVault(String nodeName, Path workDir) {
+ private static VaultManager createVault(Path workDir) {
Path vaultPath = workDir.resolve(VAULT_DB_PATH);
try {
@@ -1224,7 +1224,7 @@ public class IgniteImpl implements Ignite {
throw new IgniteInternalException(e);
}
- return new VaultManager(new PersistentVaultService(nodeName,
vaultPath));
+ return new VaultManager(new PersistentVaultService(vaultPath));
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index 52c636cd9f..ee4c143dcc 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -113,11 +113,15 @@ public class LocalConfigurationStorage implements
ConfigurationStorage {
/** {@inheritDoc} */
@Override
public CompletableFuture<Serializable> readLatest(String key) {
- return vaultMgr.get(new ByteArray(LOC_PREFIX + key))
- .thenApply(entry -> entry == null ? null :
fromBytes(entry.value()))
- .exceptionally(e -> {
- throw new StorageException("Exception while reading vault
entry", e);
- });
+ return registerFuture(supplyAsync(() -> {
+ try {
+ VaultEntry entry = vaultMgr.get(new ByteArray(LOC_PREFIX +
key));
+
+ return entry == null ? null : fromBytes(entry.value());
+ } catch (Exception e) {
+ throw new StorageException("Exception while reading vault
entry", e);
+ }
+ }, threadPool));
}
/** {@inheritDoc} */
@@ -189,9 +193,9 @@ public class LocalConfigurationStorage implements
ConfigurationStorage {
Data entries = new Data(newValues, version + 1);
- return vaultMgr.putAll(data)
- .thenCompose(v ->
lsnr.onEntriesChanged(entries))
- .thenApply(v -> true);
+ vaultMgr.putAll(data);
+
+ return lsnr.onEntriesChanged(entries).thenApply(v ->
true);
}, threadPool));
// ignore any errors on the write future, because we are only
interested in its completion
@@ -222,8 +226,11 @@ public class LocalConfigurationStorage implements
ConfigurationStorage {
/** {@inheritDoc} */
@Override
public CompletableFuture<Long> lastRevision() {
- return vaultMgr.get(VERSION_KEY)
- .thenApply(entry -> entry == null ? 0 : (Long)
fromBytes(entry.value()));
+ return registerFuture(supplyAsync(() -> {
+ VaultEntry entry = vaultMgr.get(VERSION_KEY);
+
+ return entry == null ? 0 : (Long) fromBytes(entry.value());
+ }, threadPool));
}
@Override
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 1afc9df950..a731be183d 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -180,7 +180,7 @@ public abstract class BaseIgniteRestartTest extends
IgniteAbstractTest {
/**
* Starts the Vault component.
*/
- public static VaultManager createVault(String nodeName, Path workDir) {
+ public static VaultManager createVault(Path workDir) {
Path vaultPath = workDir.resolve(Paths.get("vault"));
try {
@@ -189,7 +189,7 @@ public abstract class BaseIgniteRestartTest extends
IgniteAbstractTest {
throw new IgniteInternalException(e);
}
- return new VaultManager(new PersistentVaultService(nodeName,
vaultPath));
+ return new VaultManager(new PersistentVaultService(vaultPath));
}
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index dd61c4978f..c5671db508 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -931,7 +931,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Path dir = workDir.resolve(name);
- vaultManager = createVault(name, dir);
+ vaultManager = createVault(dir);
nodeCfgGenerator = new ConfigurationTreeGenerator(
List.of(
@@ -1287,8 +1287,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
/**
* Starts the Vault component.
*/
- private static VaultManager createVault(String nodeName, Path workDir) {
- return new VaultManager(new PersistentVaultService(nodeName,
resolveDir(workDir, "vault")));
+ private static VaultManager createVault(Path workDir) {
+ return new VaultManager(new PersistentVaultService(resolveDir(workDir,
"vault")));
}
private static Path resolveDir(Path workDir, String dirName) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index 4db46d68b0..c0cf426414 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.concurrent.Executors;
@@ -43,6 +42,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.jetbrains.annotations.Nullable;
@@ -76,7 +76,7 @@ public class LowWatermark implements ManuallyCloseable {
private final AtomicBoolean closeGuard = new AtomicBoolean();
- private final AtomicReference<HybridTimestamp> lowWatermark = new
AtomicReference<>();
+ private volatile HybridTimestamp lowWatermark;
private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
@@ -119,50 +119,45 @@ public class LowWatermark implements ManuallyCloseable {
*/
public void start() {
inBusyLock(busyLock, () -> {
- vaultManager.get(LOW_WATERMARK_VAULT_KEY)
- .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
- if (vaultEntry == null) {
- scheduleUpdateLowWatermarkBusy();
+ HybridTimestamp lowWatermark = readLowWatermarkFromVault();
- return nullCompletedFuture();
- }
+ if (lowWatermark == null) {
+ LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
- HybridTimestamp lowWatermark =
ByteUtils.fromBytes(vaultEntry.value());
+ return;
+ }
- return txManager.updateLowWatermark(lowWatermark)
- .thenApply(unused -> {
- this.lowWatermark.set(lowWatermark);
+ LOG.info(
+ "Low watermark has been successfully retrieved from the
vault and is scheduled to be updated: {}",
+ lowWatermark
+ );
-
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+ txManager.updateLowWatermark(lowWatermark)
+ .thenRun(() -> inBusyLock(busyLock, () -> {
+ this.lowWatermark = lowWatermark;
- return lowWatermark;
- });
+ runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
}))
- .whenComplete((lowWatermark, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof NodeStoppingException))
{
- LOG.error("Error getting low watermark",
throwable);
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null && !(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Error during the Watermark manager
start", throwable);
- failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
- inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
- }
- } else {
- if (lowWatermark == null) {
- LOG.info(
- "Previous value of the low watermark
was not found, will schedule to update it"
- );
- } else {
- LOG.info(
- "Low watermark has been successfully
got from the vault and is scheduled to be updated: {}",
- lowWatermark
- );
- }
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
}
});
});
}
+ private @Nullable HybridTimestamp readLowWatermarkFromVault() {
+ VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
+
+ return vaultEntry == null ? null :
ByteUtils.fromBytes(vaultEntry.value());
+ }
+
@Override
public void close() {
if (!closeGuard.compareAndSet(false, true)) {
@@ -184,7 +179,7 @@ public class LowWatermark implements ManuallyCloseable {
* Returns the current low watermark, {@code null} means no low watermark
has been assigned yet.
*/
public @Nullable HybridTimestamp getLowWatermark() {
- return lowWatermark.get();
+ return lowWatermark;
}
void updateLowWatermark() {
@@ -195,18 +190,13 @@ public class LowWatermark implements ManuallyCloseable {
// created, then we can safely promote the candidate as a new low
watermark, store it in vault, and we can safely start cleaning
// up the stale/junk data in the tables.
txManager.updateLowWatermark(lowWatermarkCandidate)
- .thenComposeAsync(
- unused -> inBusyLock(
- busyLock,
- () ->
vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermarkCandidate))
- ),
- scheduledThreadPool
- )
- .thenRun(() -> inBusyLock(busyLock, () -> {
- lowWatermark.set(lowWatermarkCandidate);
+ .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+ vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermarkCandidate));
+
+ lowWatermark = lowWatermarkCandidate;
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
- }))
+ }), scheduledThreadPool)
.whenComplete((unused, throwable) -> {
if (throwable != null) {
if (!(throwable instanceof NodeStoppingException))
{
@@ -250,7 +240,7 @@ public class LowWatermark implements ManuallyCloseable {
-lowWatermarkConfig.dataAvailabilityTime().value() -
getMaxClockSkew()
);
- HybridTimestamp lowWatermark = this.lowWatermark.get();
+ HybridTimestamp lowWatermark = this.lowWatermark;
assert lowWatermark == null ||
lowWatermarkCandidate.compareTo(lowWatermark) > 0 :
"lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" +
lowWatermarkCandidate;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index 7de224a2c4..42997ac27c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -46,7 +45,6 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -84,15 +82,13 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
}
@AfterEach
- void tearDown() throws Exception {
+ void tearDown() {
lowWatermark.close();
}
@Test
void testStartWithEmptyVault() {
// Let's check the start with no low watermark in vault.
-
when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(nullCompletedFuture());
-
lowWatermark.start();
verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class));
@@ -104,7 +100,7 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
HybridTimestamp lowWatermark = new HybridTimestamp(10, 10);
when(vaultManager.get(LOW_WATERMARK_VAULT_KEY))
- .thenReturn(completedFuture(new
VaultEntry(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermark))));
+ .thenReturn(new VaultEntry(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermark)));
when(txManager.updateLowWatermark(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
@@ -134,8 +130,6 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
when(txManager.updateLowWatermark(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
- when(vaultManager.put(any(ByteArray.class),
any(byte[].class))).thenReturn(nullCompletedFuture());
-
// Make a predictable candidate to make it easier to test.
HybridTimestamp newLowWatermarkCandidate =
lowWatermark.createNewLowWatermarkCandidate();
@@ -157,11 +151,8 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
*/
@Test
void testUpdateWatermarkSequentially() throws Exception {
-
when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(nullCompletedFuture());
assertThat(lowWatermarkConfig.updateFrequency().update(10L),
willSucceedFast());
- when(vaultManager.put(any(ByteArray.class),
any(byte[].class))).thenReturn(nullCompletedFuture());
-
CountDownLatch startGetAllReadOnlyTransactions = new CountDownLatch(3);
CompletableFuture<Void> finishGetAllReadOnlyTransactions = new
CompletableFuture<>();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7161778da0..8e2052afcb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -77,7 +77,6 @@ import
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -704,9 +703,6 @@ public class TableManagerTest extends IgniteAbstractTest {
Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
VaultManager vaultManager = mock(VaultManager.class);
-
when(vaultManager.get(any(ByteArray.class))).thenReturn(nullCompletedFuture());
- when(vaultManager.put(any(ByteArray.class),
any(byte[].class))).thenReturn(nullCompletedFuture());
-
TableManager tableManager = new TableManager(
NODE_NAME,
revisionUpdater,
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index 02e910dc54..d94f6435d8 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -46,7 +46,6 @@ public class VaultManager implements IgniteComponent {
this.vaultSvc = vaultSvc;
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> start() {
vaultSvc.start();
@@ -54,7 +53,6 @@ public class VaultManager implements IgniteComponent {
return nullCompletedFuture();
}
- /** {@inheritDoc} */
@Override
public void stop() {
// TODO: IGNITE-15161 Implement component's stop.
@@ -65,9 +63,9 @@ public class VaultManager implements IgniteComponent {
* See {@link VaultService#get}.
*
* @param key Key. Cannot be {@code null}.
- * @return Future that resolves into an entry for the given key, or {@code
null} if no such mapping exists.
+ * @return Entry for the given key, or {@code null} if no such mapping
exists.
*/
- public CompletableFuture<VaultEntry> get(ByteArray key) {
+ public @Nullable VaultEntry get(ByteArray key) {
return vaultSvc.get(key);
}
@@ -76,20 +74,18 @@ public class VaultManager implements IgniteComponent {
*
* @param key Vault key. Cannot be {@code null}.
* @param val Value. If value is equal to {@code null}, then previous
value with key will be deleted if there was any mapping.
- * @return Future representing pending completion of the operation. Cannot
be {@code null}.
*/
- public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
- return vaultSvc.put(key, val);
+ public void put(ByteArray key, byte @Nullable [] val) {
+ vaultSvc.put(key, val);
}
/**
* See {@link VaultService#remove}.
*
* @param key Vault key. Cannot be {@code null}.
- * @return Future representing pending completion of the operation. Cannot
be {@code null}.
*/
- public CompletableFuture<Void> remove(ByteArray key) {
- return vaultSvc.remove(key);
+ public void remove(ByteArray key) {
+ vaultSvc.remove(key);
}
/**
@@ -108,30 +104,30 @@ public class VaultManager implements IgniteComponent {
* value with key will be deleted if there was any mapping.
*
* @param vals The map of keys and corresponding values. Cannot be {@code
null} or empty.
- * @return Future representing pending completion of the operation. Cannot
be {@code null}.
*/
- public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
- return vaultSvc.putAll(vals);
+ public void putAll(Map<ByteArray, byte[]> vals) {
+ vaultSvc.putAll(vals);
}
/**
* Persist node name to the vault.
*
* @param name node name to persist. Cannot be null.
- * @return Future representing pending completion of the operation.
*/
- public CompletableFuture<Void> putName(String name) {
+ public void putName(String name) {
if (name.isBlank()) {
throw new IllegalArgumentException("Name must not be empty");
}
- return put(NODE_NAME, name.getBytes(UTF_8));
+ put(NODE_NAME, name.getBytes(UTF_8));
}
/**
- * Returns {@code CompletableFuture} which, when complete, returns the
node name, if was stored earlier, or {@code null} otherwise.
+ * Returns the node name, if was stored earlier, or {@code null} otherwise.
*/
- public CompletableFuture<String> name() {
- return vaultSvc.get(NODE_NAME).thenApply(name -> name == null ? null :
new String(name.value(), UTF_8));
+ public @Nullable String name() {
+ VaultEntry nameEntry = vaultSvc.get(NODE_NAME);
+
+ return nameEntry == null ? null : new String(nameEntry.value(), UTF_8);
}
}
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
index f1035d672f..277bcb3dc4 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.vault;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.util.Cursor;
@@ -37,9 +36,9 @@ public interface VaultService extends ManuallyCloseable {
* Retrieves an entry for the given key.
*
* @param key Key. Cannot be {@code null}.
- * @return Future that resolves into an entry for the given key, or {@code
null} no such mapping exists.
+ * @return Entry for the given key, or {@code null} no such mapping exists.
*/
- CompletableFuture<VaultEntry> get(ByteArray key);
+ @Nullable VaultEntry get(ByteArray key);
/**
* Writes a given value to the Vault. If the value is {@code null}, then
the previous value under the same key (if any) will
@@ -48,17 +47,15 @@ public interface VaultService extends ManuallyCloseable {
* @param key Vault key. Cannot be {@code null}.
* @param val Value. If value is equal to {@code null}, then the previous
value under the same key (if any) will
* be deleted.
- * @return Future representing pending completion of the operation. Cannot
be {@code null}.
*/
- CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val);
+ void put(ByteArray key, byte @Nullable [] val);
/**
* Removes a value from the vault.
*
* @param key Vault key. Cannot be {@code null}.
- * @return Future representing pending completion of the operation. Cannot
be {@code null}.
*/
- CompletableFuture<Void> remove(ByteArray key);
+ void remove(ByteArray key);
/**
* Returns a view of the portion of the vault whose keys range from {@code
fromKey}, inclusive, to {@code toKey}, exclusive.
@@ -74,9 +71,8 @@ public interface VaultService extends ManuallyCloseable {
* then the corresponding key will be deleted.
*
* @param vals The map of keys and corresponding values. Cannot be {@code
null}.
- * @return Future representing pending completion of the operation. Cannot
be {@code null}.
*/
- CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals);
+ void putAll(Map<ByteArray, byte[]> vals);
/**
* Closes the service.
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
index 9ac8cea2d5..047cb8d7b0 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
@@ -19,21 +19,11 @@ package org.apache.ignite.internal.vault.persistence;
import java.nio.file.Path;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultService;
import org.jetbrains.annotations.Nullable;
@@ -58,12 +48,6 @@ public class PersistentVaultService implements VaultService {
RocksDB.loadLibrary();
}
- private static final IgniteLogger LOG =
Loggers.forClass(PersistentVaultService.class);
-
- private final ExecutorService threadPool;
-
- private final InFlightFutures futureTracker = new InFlightFutures();
-
private final Options options = options();
private volatile RocksDB db;
@@ -74,13 +58,10 @@ public class PersistentVaultService implements VaultService
{
/**
* Creates persistent vault service.
*
- * @param nodeName Node name.
* @param path base path for RocksDB
*/
- public PersistentVaultService(String nodeName, Path path) {
+ public PersistentVaultService(Path path) {
this.path = path;
-
- threadPool = Executors.newFixedThreadPool(4,
NamedThreadFactory.create(nodeName, "vault", LOG));
}
private static Options options() {
@@ -112,59 +93,44 @@ public class PersistentVaultService implements
VaultService {
}
}
- /** {@inheritDoc} */
@Override
public void close() {
- IgniteUtils.shutdownAndAwaitTermination(threadPool, 10,
TimeUnit.SECONDS);
-
- futureTracker.cancelInFlightFutures();
-
- RocksUtils.closeAll(options, db);
+ RocksUtils.closeAll(db, options);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<VaultEntry> get(ByteArray key) {
- return supplyAsync(() -> {
- try {
- byte[] value = db.get(key.bytes());
-
- return value == null ? null : new VaultEntry(key, value);
- } catch (RocksDBException e) {
- throw new IgniteInternalException("Unable to read data from
RocksDB", e);
- }
- });
+ public @Nullable VaultEntry get(ByteArray key) {
+ try {
+ byte[] value = db.get(key.bytes());
+
+ return value == null ? null : new VaultEntry(key, value);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException("Unable to read data from
RocksDB", e);
+ }
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
- return runAsync(() -> {
- try {
- if (val == null) {
- db.delete(key.bytes());
- } else {
- db.put(key.bytes(), val);
- }
- } catch (RocksDBException e) {
- throw new IgniteInternalException("Unable to write data to
RocksDB", e);
+ public void put(ByteArray key, byte @Nullable [] val) {
+ try {
+ if (val == null) {
+ db.delete(key.bytes());
+ } else {
+ db.put(key.bytes(), val);
}
- });
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException("Unable to write data to
RocksDB", e);
+ }
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> remove(ByteArray key) {
- return runAsync(() -> {
- try {
- db.delete(key.bytes());
- } catch (RocksDBException e) {
- throw new IgniteInternalException("Unable to remove data to
RocksDB", e);
- }
- });
+ public void remove(ByteArray key) {
+ try {
+ db.delete(key.bytes());
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException("Unable to remove data to
RocksDB", e);
+ }
}
- /** {@inheritDoc} */
@Override
public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
var readOpts = new ReadOptions();
@@ -187,47 +153,28 @@ public class PersistentVaultService implements
VaultService {
public void close() {
super.close();
- RocksUtils.closeAll(upperBound, readOpts);
+ RocksUtils.closeAll(readOpts, upperBound);
}
};
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
- return runAsync(() -> {
- try (
- var writeBatch = new WriteBatch();
- var writeOpts = new WriteOptions()
- ) {
- for (var entry : vals.entrySet()) {
- if (entry.getValue() == null) {
- writeBatch.delete(entry.getKey().bytes());
- } else {
- writeBatch.put(entry.getKey().bytes(),
entry.getValue());
- }
+ public void putAll(Map<ByteArray, byte[]> vals) {
+ try (
+ var writeBatch = new WriteBatch();
+ var writeOpts = new WriteOptions()
+ ) {
+ for (var entry : vals.entrySet()) {
+ if (entry.getValue() == null) {
+ writeBatch.delete(entry.getKey().bytes());
+ } else {
+ writeBatch.put(entry.getKey().bytes(), entry.getValue());
}
-
- db.write(writeOpts, writeBatch);
- } catch (RocksDBException e) {
- throw new IgniteInternalException("Unable to write data to
RocksDB", e);
}
- });
- }
- private <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
- CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier,
threadPool);
-
- futureTracker.registerFuture(future);
-
- return future;
- }
-
- private CompletableFuture<Void> runAsync(Runnable runnable) {
- CompletableFuture<Void> future = CompletableFuture.runAsync(runnable,
threadPool);
-
- futureTracker.registerFuture(future);
-
- return future;
+ db.write(writeOpts, writeBatch);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException("Unable to write data to
RocksDB", e);
+ }
}
}
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
index b84af179fb..320d599a54 100644
---
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.vault;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -54,15 +54,15 @@ public class VaultManagerTest {
*/
@Test
void testName() {
- assertThat(vaultManager.name(), willBe(nullValue(String.class)));
+ assertThat(vaultManager.name(), is(nullValue(String.class)));
- assertThat(vaultManager.putName("foobar"),
willBe(nullValue(Void.class)));
+ vaultManager.putName("foobar");
- assertThat(vaultManager.name(), willBe(equalTo("foobar")));
+ assertThat(vaultManager.name(), is(equalTo("foobar")));
- assertThat(vaultManager.putName("foobarbaz"),
willBe(nullValue(Void.class)));
+ vaultManager.putName("foobarbaz");
- assertThat(vaultManager.name(), willBe(equalTo("foobarbaz")));
+ assertThat(vaultManager.name(), is(equalTo("foobarbaz")));
}
/**
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
index c863eb928f..3a60f8b53c 100644
---
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.vault;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -76,18 +76,18 @@ public abstract class VaultServiceTest {
public void testPut() {
ByteArray key = getKey(1);
- assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+ assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
byte[] val = getValue(1);
- assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+ vaultService.put(key, val);
- assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+ assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key,
val))));
// test idempotency
- assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+ vaultService.put(key, val);
- assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+ assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key,
val))));
}
/**
@@ -99,13 +99,13 @@ public abstract class VaultServiceTest {
byte[] val = getValue(1);
- assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+ vaultService.put(key, val);
- assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+ assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key,
val))));
- assertThat(vaultService.put(key, null), willBe(nullValue(Void.class)));
+ vaultService.put(key, null);
- assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+ assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
}
/**
@@ -116,20 +116,20 @@ public abstract class VaultServiceTest {
ByteArray key = getKey(1);
// Remove non-existent value.
- assertThat(vaultService.remove(key), willBe(nullValue(Void.class)));
+ assertDoesNotThrow(() -> vaultService.remove(key));
- assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+ assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
byte[] val = getValue(1);
- assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+ vaultService.put(key, val);
- assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key,
val))));
+ assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key,
val))));
// Remove existing value.
- assertThat(vaultService.remove(key), willBe(nullValue(Void.class)));
+ vaultService.remove(key);
- assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+ assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
}
/**
@@ -141,13 +141,13 @@ public abstract class VaultServiceTest {
.boxed()
.collect(toMap(VaultServiceTest::getKey,
VaultServiceTest::getValue));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
- batch.forEach((k, v) -> assertThat(vaultService.get(k),
willBe(equalTo(new VaultEntry(k, v)))));
+ batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new
VaultEntry(k, v)))));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
- batch.forEach((k, v) -> assertThat(vaultService.get(k),
willBe(equalTo(new VaultEntry(k, v)))));
+ batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new
VaultEntry(k, v)))));
}
/**
@@ -159,9 +159,9 @@ public abstract class VaultServiceTest {
.boxed()
.collect(toMap(VaultServiceTest::getKey,
VaultServiceTest::getValue));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
- batch.forEach((k, v) -> assertThat(vaultService.get(k),
willBe(equalTo(new VaultEntry(k, v)))));
+ batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new
VaultEntry(k, v)))));
Map<ByteArray, byte[]> secondBatch = new HashMap<>();
@@ -170,12 +170,12 @@ public abstract class VaultServiceTest {
secondBatch.put(getKey(1), null);
secondBatch.put(getKey(3), null);
- assertThat(vaultService.putAll(secondBatch),
willBe(nullValue(Void.class)));
+ vaultService.putAll(secondBatch);
- assertThat(vaultService.get(getKey(4)), willBe(equalTo(new
VaultEntry(getKey(4), getValue(3)))));
- assertThat(vaultService.get(getKey(8)), willBe(equalTo(new
VaultEntry(getKey(8), getValue(3)))));
- assertThat(vaultService.get(getKey(1)),
willBe(nullValue(VaultEntry.class)));
- assertThat(vaultService.get(getKey(3)),
willBe(nullValue(VaultEntry.class)));
+ assertThat(vaultService.get(getKey(4)), is(equalTo(new
VaultEntry(getKey(4), getValue(3)))));
+ assertThat(vaultService.get(getKey(8)), is(equalTo(new
VaultEntry(getKey(8), getValue(3)))));
+ assertThat(vaultService.get(getKey(1)),
is(nullValue(VaultEntry.class)));
+ assertThat(vaultService.get(getKey(3)),
is(nullValue(VaultEntry.class)));
}
/**
@@ -187,7 +187,7 @@ public abstract class VaultServiceTest {
Map<ByteArray, byte[]> batch =
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
List<VaultEntry> range = range(getKey(3), getKey(7));
@@ -203,7 +203,7 @@ public abstract class VaultServiceTest {
Map<ByteArray, byte[]> batch =
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
List<VaultEntry> range = range(getKey(0), getKey(9));
@@ -219,7 +219,7 @@ public abstract class VaultServiceTest {
Map<ByteArray, byte[]> batch =
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
List<VaultEntry> range = range(getKey(3), getKey(4));
@@ -233,7 +233,7 @@ public abstract class VaultServiceTest {
public void testRangeInvalidBoundaries() throws Exception {
Map<ByteArray, byte[]> batch = getRange(3,
5).stream().collect(toMap(VaultEntry::key, VaultEntry::value));
- assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+ vaultService.putAll(batch);
List<VaultEntry> range = range(getKey(4), getKey(1));
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
index 134514361e..291a56bf1a 100644
---
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
@@ -18,11 +18,9 @@
package org.apache.ignite.internal.vault.persistence;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
@@ -48,39 +46,37 @@ class ItPersistencePropertiesVaultServiceTest {
* Tests that the Vault Service correctly persists data after multiple
service restarts.
*/
@Test
- void testPersistentRestart() throws Exception {
+ void testPersistentRestart() {
var data = Map.of(
new ByteArray("key" + 1), fromString("value" + 1),
new ByteArray("key" + 2), fromString("value" + 2),
new ByteArray("key" + 3), fromString("value" + 3)
);
- String nodeName = "test";
-
- var service = new PersistentVaultService(nodeName, vaultDir);
+ var service = new PersistentVaultService(vaultDir);
try {
service.start();
- assertThat(service.putAll(data), willBe(nullValue(Void.class)));
+ service.putAll(data);
} finally {
service.close();
}
- service = new PersistentVaultService(nodeName, vaultDir);
+ service = new PersistentVaultService(vaultDir);
try {
service.start();
assertThat(
service.get(new ByteArray("key" + 1)),
- willBe(equalTo(new VaultEntry(new ByteArray("key" + 1),
fromString("value" + 1))))
+ is(equalTo(new VaultEntry(new ByteArray("key" + 1),
fromString("value" + 1))))
);
} finally {
service.close();
}
- service = new PersistentVaultService(nodeName, vaultDir);
+ service = new PersistentVaultService(vaultDir);
try {
service.start();
diff --git
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
index 521cd3ddc4..1a8a077e91 100644
---
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
+++
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
@@ -35,6 +35,6 @@ class ItPersistentVaultServiceTest extends VaultServiceTest {
/** {@inheritDoc} */
@Override
protected VaultService getVaultService() {
- return new PersistentVaultService("test", vaultDir);
+ return new PersistentVaultService(vaultDir);
}
}
diff --git
a/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
b/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
index 6dc5916ff5..87188e0afd 100644
---
a/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
+++
b/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
@@ -17,17 +17,15 @@
package org.apache.ignite.internal.vault.inmemory;
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultService;
import org.jetbrains.annotations.Nullable;
@@ -47,75 +45,57 @@ public class InMemoryVaultService implements VaultService {
// No-op.
}
- /** {@inheritDoc} */
@Override
public void close() {
// No-op.
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<VaultEntry> get(ByteArray key) {
- return supplyAsync(() -> {
- synchronized (mux) {
- byte[] value = storage.get(key);
+ public @Nullable VaultEntry get(ByteArray key) {
+ synchronized (mux) {
+ byte[] value = storage.get(key);
- return value == null ? null : new VaultEntry(key,
storage.get(key));
- }
- });
+ return value == null ? null : new VaultEntry(key, value);
+ }
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
- return runAsync(() -> {
- synchronized (mux) {
- storage.put(key, val);
- }
- });
+ public void put(ByteArray key, byte @Nullable [] val) {
+ synchronized (mux) {
+ storage.put(key, val);
+ }
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> remove(ByteArray key) {
- return runAsync(() -> {
- synchronized (mux) {
- storage.remove(key);
- }
- });
+ public void remove(ByteArray key) {
+ synchronized (mux) {
+ storage.remove(key);
+ }
}
- /** {@inheritDoc} */
@Override
public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
- Iterator<VaultEntry> it;
-
if (fromKey.compareTo(toKey) >= 0) {
- it = Collections.emptyIterator();
- } else {
- synchronized (mux) {
- it = storage.subMap(fromKey, toKey).entrySet().stream()
- .map(e -> new VaultEntry(new ByteArray(e.getKey()),
e.getValue()))
- .iterator();
- }
+ return CursorUtils.emptyCursor();
}
- return Cursor.fromBareIterator(it);
+ synchronized (mux) {
+ return storage.subMap(fromKey, toKey).entrySet().stream()
+ .map(e -> new VaultEntry(new ByteArray(e.getKey()),
e.getValue()))
+ .collect(collectingAndThen(toList(),
Cursor::fromIterable));
+ }
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
- return runAsync(() -> {
- synchronized (mux) {
- for (var entry : vals.entrySet()) {
- if (entry.getValue() == null) {
- storage.remove(entry.getKey());
- } else {
- storage.put(entry.getKey(), entry.getValue());
- }
+ public void putAll(Map<ByteArray, byte[]> vals) {
+ synchronized (mux) {
+ for (Map.Entry<ByteArray, byte[]> entry : vals.entrySet()) {
+ if (entry.getValue() == null) {
+ storage.remove(entry.getKey());
+ } else {
+ storage.put(entry.getKey(), entry.getValue());
}
}
- });
+ }
}
}