This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d7d0f741e3 IGNITE-18529 Deal with deleting a table/partition and
rebalancing partitions (#1745)
d7d0f741e3 is described below
commit d7d0f741e35c85fc63c28466c4caad93d160e9e1
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Mar 7 11:42:54 2023 +0300
IGNITE-18529 Deal with deleting a table/partition and rebalancing
partitions (#1745)
---
docs/_docs/glossary/glossary.adoc | 2 +-
.../internal/storage/util/MvPartitionStorages.java | 158 +++++++--------------
.../internal/storage/util/StorageOperation.java | 87 ++++++++++--
.../storage/util/MvPartitionStoragesTest.java | 154 +++++++++++++++-----
.../storage/AbstractMvTableStorageTest.java | 18 +++
.../storage/impl/TestMvPartitionStorage.java | 2 -
.../internal/storage/impl/TestMvTableStorage.java | 6 +-
.../pagememory/AbstractPageMemoryTableStorage.java | 14 +-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 6 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 6 +-
.../storage/rocksdb/RocksDbTableStorage.java | 27 ++--
11 files changed, 304 insertions(+), 176 deletions(-)
diff --git a/docs/_docs/glossary/glossary.adoc
b/docs/_docs/glossary/glossary.adoc
index b43156752d..d3c5393817 100644
--- a/docs/_docs/glossary/glossary.adoc
+++ b/docs/_docs/glossary/glossary.adoc
@@ -22,7 +22,7 @@ Cluster Management Group::A subset of Ignite nodes in a Raft
cluster. Cluster gr
Data Region:: Data regions are used to control the amount of memory available
to the storage. Depending on the type of storage the data region is assigned
to, the data may be loaded into RAM or stored
-Data Rebalace:: Data rebalance is the process of redistributing partitions to
make sure they are distributed equally across all nodes in the cluster.
+Data Rebalance:: Data rebalance is the process of redistributing partitions to
make sure they are distributed equally across all nodes in the cluster.
==== M
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
index 8f8ce9fcbe..cfb9d6446e 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import
org.apache.ignite.internal.storage.util.StorageOperation.AbortRebalanceStorageOperation;
import
org.apache.ignite.internal.storage.util.StorageOperation.CleanupStorageOperation;
+import
org.apache.ignite.internal.storage.util.StorageOperation.CloseStorageOperation;
import
org.apache.ignite.internal.storage.util.StorageOperation.CreateStorageOperation;
import
org.apache.ignite.internal.storage.util.StorageOperation.DestroyStorageOperation;
import
org.apache.ignite.internal.storage.util.StorageOperation.FinishRebalanceStorageOperation;
@@ -54,7 +55,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
private final ConcurrentMap<Integer, StorageOperation>
operationByPartitionId = new ConcurrentHashMap<>();
- private final ConcurrentMap<Integer, CompletableFuture<Void>>
rebalaceFutureByPartitionId = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, CompletableFuture<Void>>
rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
/**
* Constructor.
@@ -127,7 +128,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
}).whenComplete((storage, throwable) ->
operationByPartitionId.compute(partitionId, (partId, operation) -> {
assert operation instanceof CreateStorageOperation :
createStorageInfo(partitionId) + ", op=" + operation;
- return null;
+ return completeOperation(operation);
}));
}
@@ -158,9 +159,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
operationByPartitionId.compute(partitionId, (partId,
operation) -> {
assert operation instanceof DestroyStorageOperation :
createStorageInfo(partitionId) + ", op=" + operation;
- DestroyStorageOperation destroyStorageOperation =
(DestroyStorageOperation) operation;
-
- return
destroyStorageOperation.getCreateStorageOperation();
+ return completeOperation(operation);
});
if (throwable == null) {
@@ -198,7 +197,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
operationByPartitionId.compute(partitionId, (partId,
operation) -> {
assert operation instanceof
CleanupStorageOperation : createStorageInfo(partitionId) + ", op=" + operation;
- return null;
+ return completeOperation(operation);
})
);
}
@@ -213,7 +212,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
* @throws StorageRebalanceException If the storage does not exist or
another operation is already in progress.
* @throws StorageRebalanceException If rebalancing is already in progress.
*/
- public CompletableFuture<Void> startRebalace(int partitionId, Function<T,
CompletableFuture<Void>> startRebalanceStorageFunction) {
+ public CompletableFuture<Void> startRebalance(int partitionId, Function<T,
CompletableFuture<Void>> startRebalanceStorageFunction) {
operationByPartitionId.compute(partitionId, (partId, operation) -> {
checkStorageExistsForRebalance(partitionId);
@@ -221,7 +220,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
throwExceptionDependingOnOperationForRebalance(operation,
partitionId);
}
- if (rebalaceFutureByPartitionId.containsKey(partitionId)) {
+ if (rebalanceFutureByPartitionId.containsKey(partitionId)) {
throw new
StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId));
}
@@ -232,7 +231,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
.thenCompose(unused -> {
CompletableFuture<Void> startRebalanceFuture =
startRebalanceStorageFunction.apply(get(partitionId));
- CompletableFuture<Void> old =
rebalaceFutureByPartitionId.put(partitionId, startRebalanceFuture);
+ CompletableFuture<Void> old =
rebalanceFutureByPartitionId.put(partitionId, startRebalanceFuture);
assert old == null : createStorageInfo(partitionId);
@@ -242,7 +241,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
assert operation instanceof
StartRebalanceStorageOperation :
createStorageInfo(partitionId) + ", op=" +
operation;
- return null;
+ return completeOperation(operation);
})
);
}
@@ -269,7 +268,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
return completedFuture(null)
.thenCompose(unused -> {
- CompletableFuture<Void> rebalanceFuture =
rebalaceFutureByPartitionId.remove(partitionId);
+ CompletableFuture<Void> rebalanceFuture =
rebalanceFutureByPartitionId.remove(partitionId);
if (rebalanceFuture == null) {
return completedFuture(null);
@@ -283,7 +282,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
assert operation instanceof
AbortRebalanceStorageOperation :
createStorageInfo(partitionId) + ", op=" +
operation;
- return null;
+ return completeOperation(operation);
})
);
}
@@ -306,7 +305,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
throwExceptionDependingOnOperationForRebalance(operation,
partitionId);
}
- if (!rebalaceFutureByPartitionId.containsKey(partitionId)) {
+ if (!rebalanceFutureByPartitionId.containsKey(partitionId)) {
throw new StorageRebalanceException("Storage rebalancing did
not start: [" + createStorageInfo(partitionId) + ']');
}
@@ -315,7 +314,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
return completedFuture(null)
.thenCompose(unused -> {
- CompletableFuture<Void> rebalanceFuture =
rebalaceFutureByPartitionId.remove(partitionId);
+ CompletableFuture<Void> rebalanceFuture =
rebalanceFutureByPartitionId.remove(partitionId);
assert rebalanceFuture != null :
createStorageInfo(partitionId);
@@ -325,49 +324,11 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
assert operation instanceof
FinishRebalanceStorageOperation :
createStorageInfo(partitionId) + ", op=" +
operation;
- return null;
+ return completeOperation(operation);
})
);
}
- /**
- * Collects all multi-versioned partition storages to close.
- */
- // TODO: IGNITE-18529 We need to wait for all current operations and
disable new ones
- public List<T> getAllForClose() {
- return IntStream.range(0, storageByPartitionId.length())
- .mapToObj(partitionId ->
storageByPartitionId.getAndSet(partitionId, null))
- .filter(Objects::nonNull)
- .collect(toList());
- }
-
- /**
- * Destroys all created multi-versioned partition storages.
- *
- * @param destroyStorageFunction Partition destruction function.
- * @return Future destruction of all created multi-versioned partition
storages.
- */
- // TODO: IGNITE-18529 We need to deal with parallel operations
- public CompletableFuture<Void> destroyAll(Function<T,
CompletableFuture<Void>> destroyStorageFunction) {
- List<CompletableFuture<Void>> destroyFutures = new ArrayList<>();
-
- for (int partitionId = 0; partitionId < storageByPartitionId.length();
partitionId++) {
- StorageOperation storageOperation =
operationByPartitionId.get(partitionId);
-
- if (storageOperation instanceof DestroyStorageOperation) {
- destroyFutures.add(((DestroyStorageOperation)
storageOperation).getDestroyFuture());
- } else {
- T storage = storageByPartitionId.getAndSet(partitionId, null);
-
- if (storage != null) {
- destroyFutures.add(destroyStorageFunction.apply(storage));
- }
- }
- }
-
- return
CompletableFuture.allOf(destroyFutures.toArray(CompletableFuture[]::new));
- }
-
/**
* Returns table name.
*/
@@ -430,74 +391,63 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
}
private void throwExceptionDependingOnOperation(StorageOperation
operation, int partitionId) {
- if (operation instanceof CreateStorageOperation) {
- throw new
StorageException(createStorageInProgressOfCreationErrorMessage(partitionId));
- } else if (operation instanceof DestroyStorageOperation) {
- throw new
StorageException(createStorageInProgressOfDestructionErrorMessage(partitionId));
- } else if (operation instanceof StartRebalanceStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfStartRebalanceErrorMessage(partitionId));
- } else if (operation instanceof AbortRebalanceStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfAbortRebalanceErrorMessage(partitionId));
- } else if (operation instanceof FinishRebalanceStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfFinishRebalanceErrorMessage(partitionId));
- } else if (operation instanceof CleanupStorageOperation) {
- throw new
StorageException(createStorageInProgressOfCleanupErrorMessage(partitionId));
- } else {
- throw new
StorageException(createUnknownOperationErrorMessage(partitionId, operation));
- }
+ throw new
StorageException(operation.inProcessErrorMessage(createStorageInfo(partitionId)));
}
private void
throwExceptionDependingOnOperationForRebalance(StorageOperation operation, int
partitionId) {
- if (operation instanceof CreateStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfCreationErrorMessage(partitionId));
- } else if (operation instanceof DestroyStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfDestructionErrorMessage(partitionId));
- } else if (operation instanceof StartRebalanceStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfStartRebalanceErrorMessage(partitionId));
- } else if (operation instanceof AbortRebalanceStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfAbortRebalanceErrorMessage(partitionId));
- } else if (operation instanceof FinishRebalanceStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfFinishRebalanceErrorMessage(partitionId));
- } else if (operation instanceof CleanupStorageOperation) {
- throw new
StorageRebalanceException(createStorageInProgressOfCleanupErrorMessage(partitionId));
- } else {
- throw new
StorageRebalanceException(createUnknownOperationErrorMessage(partitionId,
operation));
- }
+ throw new
StorageRebalanceException(operation.inProcessErrorMessage(createStorageInfo(partitionId)));
}
private String createStorageDoesNotExistErrorMessage(int partitionId) {
return "Storage does not exist: [" + createStorageInfo(partitionId) +
']';
}
- private String createStorageInProgressOfCreationErrorMessage(int
partitionId) {
- return "Storage is in process of being created: [" +
createStorageInfo(partitionId) + ']';
+ private String createStorageInProgressOfRebalanceErrorMessage(int
partitionId) {
+ return "Storage in the process of rebalance: [" +
createStorageInfo(partitionId) + ']';
}
- private String createStorageInProgressOfDestructionErrorMessage(int
partitionId) {
- return "Storage is already in process of being destroyed: [" +
createStorageInfo(partitionId) + ']';
- }
+ private static @Nullable StorageOperation
completeOperation(StorageOperation operation) {
+ operation.operationFuture().complete(null);
- private String createStorageInProgressOfStartRebalanceErrorMessage(int
partitionId) {
- return "Storage in the process of starting a rebalance: [" +
createStorageInfo(partitionId) + ']';
- }
+ if (operation.isFinalOperation()) {
+ return operation;
+ }
- private String createStorageInProgressOfAbortRebalanceErrorMessage(int
partitionId) {
- return "Storage in the process of aborting a rebalance: [" +
createStorageInfo(partitionId) + ']';
+ return operation instanceof DestroyStorageOperation ?
((DestroyStorageOperation) operation).getCreateStorageOperation() : null;
}
- private String createStorageInProgressOfFinishRebalanceErrorMessage(int
partitionId) {
- return "Storage in the process of finishing a rebalance: [" +
createStorageInfo(partitionId) + ']';
- }
+ /**
+ * Returns all storages for closing or destroying after completion of
operations for all storages.
+ *
+ * <p>After completing the future, when try to perform any operation,
{@link StorageException} for all storages will be thrown.
+ *
+ * @return Future that at the complete will return all the storages that
are not destroyed.
+ */
+ public CompletableFuture<List<T>> getAllForCloseOrDestroy() {
+ List<CompletableFuture<Void>> operationFutures = new ArrayList<>();
- private String createStorageInProgressOfRebalanceErrorMessage(int
partitionId) {
- return "Storage in the process of rebalance: [" +
createStorageInfo(partitionId) + ']';
- }
+ for (int partitionId = 0; partitionId < storageByPartitionId.length();
partitionId++) {
+ StorageOperation storageOperation =
operationByPartitionId.compute(partitionId, (partId, operation) -> {
+ if (operation == null) {
+ operation = new CloseStorageOperation();
+ }
- private String createStorageInProgressOfCleanupErrorMessage(int
partitionId) {
- return "Storage is in process of being cleaned up: [" +
createStorageInfo(partitionId) + ']';
- }
+ operation.markFinalOperation();
+
+ return operation;
+ });
- private String createUnknownOperationErrorMessage(int partitionId,
StorageOperation operation) {
- return "Unknown operation: [" + createStorageInfo(partitionId) + ",
operation=" + operation + ']';
+ if (!(storageOperation instanceof CloseStorageOperation)) {
+ operationFutures.add(storageOperation.operationFuture());
+ }
+ }
+
+ return
CompletableFuture.allOf(operationFutures.toArray(CompletableFuture[]::new))
+ .thenApply(unused ->
+ IntStream.range(0, storageByPartitionId.length())
+ .mapToObj(partitionId ->
storageByPartitionId.getAndSet(partitionId, null))
+ .filter(Objects::nonNull)
+ .collect(toList())
+ );
}
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
index 09b8156cde..81c1a57e45 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
@@ -24,17 +24,53 @@ import org.jetbrains.annotations.Nullable;
/**
* Storage operations.
*/
-interface StorageOperation {
+abstract class StorageOperation {
+ private final CompletableFuture<Void> operationFuture = new
CompletableFuture<>();
+
+ private volatile boolean finalOperation;
+
+ /**
+ * Returns future completion of the operation.
+ */
+ CompletableFuture<Void> operationFuture() {
+ return operationFuture;
+ }
+
+ /**
+ * Return {@code true} if the operation is the final.
+ */
+ boolean isFinalOperation() {
+ return finalOperation;
+ }
+
+ /**
+ * Marks the operation as final.
+ */
+ void markFinalOperation() {
+ finalOperation = true;
+ }
+
+ /**
+ * Creates an error message indicating that the current operation is in
progress or closed.
+ *
+ * @param storageInfo Storage information in the format "table=user,
partitionId=1".
+ */
+ abstract String inProcessErrorMessage(String storageInfo);
+
/**
* Storage creation operation.
*/
- class CreateStorageOperation implements StorageOperation {
+ static class CreateStorageOperation extends StorageOperation {
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage is in process of being created: [" + storageInfo +
']';
+ }
}
/**
* Storage destruction operation.
*/
- class DestroyStorageOperation implements StorageOperation {
+ static class DestroyStorageOperation extends StorageOperation {
private final CompletableFuture<Void> destroyFuture = new
CompletableFuture<>();
private final AtomicReference<CreateStorageOperation>
createStorageOperationReference = new AtomicReference<>();
@@ -45,46 +81,77 @@ interface StorageOperation {
* @param createStorageOperation Storage creation operation.
* @return {@code True} if the operation was set by current method
invocation, {@code false} if by another method invocation.
*/
- public boolean setCreationOperation(CreateStorageOperation
createStorageOperation) {
+ boolean setCreationOperation(CreateStorageOperation
createStorageOperation) {
return createStorageOperationReference.compareAndSet(null,
createStorageOperation);
}
/**
* Returns {@link #setCreationOperation(CreateStorageOperation) set} a
storage creation operation.
*/
- public @Nullable CreateStorageOperation getCreateStorageOperation() {
+ @Nullable CreateStorageOperation getCreateStorageOperation() {
return createStorageOperationReference.get();
}
/**
* Returns the storage destruction future.
*/
- public CompletableFuture<Void> getDestroyFuture() {
+ CompletableFuture<Void> getDestroyFuture() {
return destroyFuture;
}
+
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage is already in process of being destroyed: [" +
storageInfo + ']';
+ }
}
/**
* Storage rebalancing start operation.
*/
- class StartRebalanceStorageOperation implements StorageOperation {
+ static class StartRebalanceStorageOperation extends StorageOperation {
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage in the process of starting a rebalance: [" +
storageInfo + ']';
+ }
}
/**
* Storage rebalancing abort operation.
*/
- class AbortRebalanceStorageOperation implements StorageOperation {
+ static class AbortRebalanceStorageOperation extends StorageOperation {
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage in the process of aborting a rebalance: [" +
storageInfo + ']';
+ }
}
/**
* Storage rebalancing finish operation.
*/
- class FinishRebalanceStorageOperation implements StorageOperation {
+ static class FinishRebalanceStorageOperation extends StorageOperation {
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage in the process of finishing a rebalance: [" +
storageInfo + ']';
+ }
}
/**
* Storage cleanup operation.
*/
- class CleanupStorageOperation implements StorageOperation {
+ static class CleanupStorageOperation extends StorageOperation {
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage is in process of being cleaned up: [" +
storageInfo + ']';
+ }
+ }
+
+ /**
+ * Storage close operation.
+ */
+ static class CloseStorageOperation extends StorageOperation {
+ @Override
+ String inProcessErrorMessage(String storageInfo) {
+ return "Storage is in the process of closing: [" + storageInfo +
']';
+ }
}
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
index aed8c7d2e5..345c070f1e 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
@@ -24,7 +24,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -53,7 +54,7 @@ import org.junit.jupiter.api.function.Executable;
*/
@ExtendWith(ConfigurationExtension.class)
public class MvPartitionStoragesTest {
- @InjectConfiguration
+ @InjectConfiguration("mock.partitions = 10")
private TableConfiguration tableConfig;
private MvPartitionStorages<MvPartitionStorage> mvPartitionStorages;
@@ -312,7 +313,7 @@ public class MvPartitionStoragesTest {
CompletableFuture<Void> finishStartRebalanceMvStorage = new
CompletableFuture<>();
CompletableFuture<?> startRebalanceFuture = runAsync(() ->
- assertThat(mvPartitionStorages.startRebalace(0, mvStorage -> {
+ assertThat(mvPartitionStorages.startRebalance(0, mvStorage -> {
startStartRebalanceMvStorage.complete(null);
return finishStartRebalanceMvStorage;
@@ -354,7 +355,7 @@ public class MvPartitionStoragesTest {
// What if there is an error during the operation?
assertThat(
- mvPartitionStorages.startRebalace(0, mvStorage ->
failedFuture(new RuntimeException("from test"))),
+ mvPartitionStorages.startRebalance(0, mvStorage ->
failedFuture(new RuntimeException("from test"))),
willFailFast(RuntimeException.class)
);
@@ -421,7 +422,7 @@ public class MvPartitionStoragesTest {
invokeAbortFunction.set(false);
assertThat(
- mvPartitionStorages.startRebalace(0, mvStorage ->
failedFuture(new RuntimeException("from test"))),
+ mvPartitionStorages.startRebalance(0, mvStorage ->
failedFuture(new RuntimeException("from test"))),
willFailFast(RuntimeException.class)
);
@@ -512,7 +513,7 @@ public class MvPartitionStoragesTest {
assertThat(abortRebalanceMvStorage(0), willCompleteSuccessfully());
assertThat(
- mvPartitionStorages.startRebalace(0, mvStorage ->
failedFuture(new RuntimeException("from test"))),
+ mvPartitionStorages.startRebalance(0, mvStorage ->
failedFuture(new RuntimeException("from test"))),
willFailFast(RuntimeException.class)
);
@@ -520,49 +521,132 @@ public class MvPartitionStoragesTest {
}
@Test
- void testGetAllForClose() {
- MvPartitionStorage storage0 = mock(MvPartitionStorage.class);
- MvPartitionStorage storage1 = mock(MvPartitionStorage.class);
+ void testGetAllForCloseOrDestroy() {
+ CompletableFuture<MvPartitionStorage> mvStorage0 = createMvStorage(0);
+ CompletableFuture<MvPartitionStorage> mvStorage1 = createMvStorage(1);
+ CompletableFuture<MvPartitionStorage> mvStorage2 = createMvStorage(2);
+ CompletableFuture<MvPartitionStorage> mvStorage3 = createMvStorage(3);
+ CompletableFuture<MvPartitionStorage> mvStorage4 = createMvStorage(4);
+ CompletableFuture<MvPartitionStorage> mvStorage5 = createMvStorage(5);
- assertThat(mvPartitionStorages.create(0, partId -> storage0),
willCompleteSuccessfully());
- assertThat(mvPartitionStorages.create(1, partId -> storage1),
willCompleteSuccessfully());
+ assertThat(mvStorage0, willCompleteSuccessfully());
+ assertThat(mvStorage1, willCompleteSuccessfully());
+ assertThat(mvStorage2, willCompleteSuccessfully());
+ assertThat(mvStorage3, willCompleteSuccessfully());
+ assertThat(mvStorage4, willCompleteSuccessfully());
+ assertThat(mvStorage5, willCompleteSuccessfully());
- assertThat(mvPartitionStorages.getAllForClose(), contains(storage0,
storage1));
+ assertThat(destroyMvStorage(1), willCompleteSuccessfully());
+ assertThat(clearMvStorage(2), willCompleteSuccessfully());
+ assertThat(startRebalanceMvStorage(3), willCompleteSuccessfully());
+
+ assertThat(startRebalanceMvStorage(4), willCompleteSuccessfully());
+ assertThat(abortRebalanceMvStorage(4), willCompleteSuccessfully());
+
+ assertThat(startRebalanceMvStorage(4), willCompleteSuccessfully());
+ assertThat(finishRebalanceMvStorage(4), willCompleteSuccessfully());
+
+ CompletableFuture<List<MvPartitionStorage>> allForCloseOrDestroy =
mvPartitionStorages.getAllForCloseOrDestroy();
+
+ assertThat(allForCloseOrDestroy, willCompleteSuccessfully());
+
+ // One less, since we destroyed 1 storage.
+ assertThat(
+ allForCloseOrDestroy.join(),
+ containsInAnyOrder(mvStorage0.join(), mvStorage2.join(),
mvStorage3.join(), mvStorage4.join(), mvStorage5.join())
+ );
+
+ // What happens if we try to perform operations on storages?
+ assertThrowsWithMessage(StorageException.class, () ->
createMvStorage(6), "Storage is in the process of closing");
+ assertThrowsWithMessage(StorageException.class, () ->
destroyMvStorage(0), "Storage does not exist");
+ assertThrows(StorageException.class, () -> clearMvStorage(0), "Storage
does not exist");
+ assertThrows(StorageException.class, () -> startRebalanceMvStorage(0),
"Storage does not exist");
+ assertThrows(StorageException.class, () -> abortRebalanceMvStorage(0),
"Storage does not exist");
+ assertThrows(StorageException.class, () ->
finishRebalanceMvStorage(0), "Storage does not exist");
}
@Test
- void testDestroyAll() {
- MvPartitionStorage storage0 = mock(MvPartitionStorage.class);
- MvPartitionStorage storage1 = mock(MvPartitionStorage.class);
-
- assertThat(mvPartitionStorages.create(0, partId -> storage0),
willCompleteSuccessfully());
- assertThat(mvPartitionStorages.create(1, partId -> storage1),
willCompleteSuccessfully());
+ void testWaitOperationOnGetAllForCloseOrDestroy() {
+ CompletableFuture<Void> createStorageOperationFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> destroyStorageOperationFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> clearStorageOperationFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> startRebalanceStorageOperationFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> abortRebalanceStorageOperationFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> finishRebalanceStorageOperationFuture = new
CompletableFuture<>();
+
+ CompletableFuture<?> create0StorageFuture = runAsync(() ->
mvPartitionStorages.create(0, partId -> {
+ assertThat(createStorageOperationFuture,
willCompleteSuccessfully());
+
+ return mock(MvPartitionStorage.class);
+ }));
+
+ assertThat(createMvStorage(1), willCompleteSuccessfully());
+ assertThat(createMvStorage(2), willCompleteSuccessfully());
+ assertThat(createMvStorage(3), willCompleteSuccessfully());
+ assertThat(createMvStorage(4), willCompleteSuccessfully());
+ assertThat(createMvStorage(5), willCompleteSuccessfully());
+
+ CompletableFuture<Void> destroy1StorageFuture =
mvPartitionStorages.destroy(1, storage -> destroyStorageOperationFuture);
+ CompletableFuture<Void> clear2StorageFuture =
mvPartitionStorages.clear(2, storage -> clearStorageOperationFuture);
+
+ CompletableFuture<Void> startRebalance3StorageFuture =
mvPartitionStorages.startRebalance(
+ 3,
+ storage -> startRebalanceStorageOperationFuture
+ );
- CompletableFuture<Void> startDestroyMvStorage0Future = new
CompletableFuture<>();
- CompletableFuture<Void> finishDestroyMvStorage0Future = new
CompletableFuture<>();
+ assertThat(startRebalanceMvStorage(4), willCompleteSuccessfully());
+ assertThat(startRebalanceMvStorage(5), willCompleteSuccessfully());
- CompletableFuture<?> destroyMvStorage0Future = runAsync(() ->
- assertThat(mvPartitionStorages.destroy(0, mvStorage -> {
- startDestroyMvStorage0Future.complete(null);
+ CompletableFuture<Void> abortRebalance4StorageFuture =
mvPartitionStorages.abortRebalance(
+ 4,
+ storage -> abortRebalanceStorageOperationFuture
+ );
- return finishDestroyMvStorage0Future;
- }), willCompleteSuccessfully())
+ CompletableFuture<Void> finishRebalance5StorageFuture =
mvPartitionStorages.finishRebalance(
+ 5,
+ storage -> finishRebalanceStorageOperationFuture
);
- assertThat(startDestroyMvStorage0Future, willCompleteSuccessfully());
+ CompletableFuture<List<MvPartitionStorage>> allForCloseOrDestroyFuture
= mvPartitionStorages.getAllForCloseOrDestroy();
- CompletableFuture<Void> destroyAllMvStoragesFuture =
mvPartitionStorages.destroyAll(mvStorage -> {
- assertSame(mvStorage, storage1);
+ assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
- return completedFuture(null);
- });
+ // Let's finish creating the storage.
+ createStorageOperationFuture.complete(null);
+
+ assertThat(create0StorageFuture, willCompleteSuccessfully());
+ assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+ // Let's finish destroying the storage.
+ destroyStorageOperationFuture.complete(null);
+
+ assertThat(destroy1StorageFuture, willCompleteSuccessfully());
+ assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+ // Let's finish clearing the storage.
+ clearStorageOperationFuture.complete(null);
+
+ assertThat(clear2StorageFuture, willCompleteSuccessfully());
+ assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+ // Let's finish starting rebalance the storage.
+ startRebalanceStorageOperationFuture.complete(null);
+
+ assertThat(startRebalance3StorageFuture, willCompleteSuccessfully());
+ assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+ // Let's finish aborting rebalance the storage.
+ abortRebalanceStorageOperationFuture.complete(null);
+
+ assertThat(abortRebalance4StorageFuture, willCompleteSuccessfully());
+ assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
- assertThat(destroyAllMvStoragesFuture, willTimeoutFast());
+ // Let's finish finishing rebalance the storage.
+ finishRebalanceStorageOperationFuture.complete(null);
- finishDestroyMvStorage0Future.complete(null);
+ assertThat(finishRebalance5StorageFuture, willCompleteSuccessfully());
- assertThat(destroyMvStorage0Future, willCompleteSuccessfully());
- assertThat(destroyAllMvStoragesFuture, willCompleteSuccessfully());
+ assertThat(allForCloseOrDestroyFuture, willCompleteSuccessfully());
}
private MvPartitionStorage getMvStorage(int partitionId) {
@@ -582,7 +666,7 @@ public class MvPartitionStoragesTest {
}
private CompletableFuture<Void> startRebalanceMvStorage(int partitionId) {
- return mvPartitionStorages.startRebalace(partitionId, mvStorage ->
completedFuture(null));
+ return mvPartitionStorages.startRebalance(partitionId, mvStorage ->
completedFuture(null));
}
private CompletableFuture<Void> abortRebalanceMvStorage(int partitionId) {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index b940f280f1..21e04bcf5a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -711,6 +711,24 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
}
}
+ @Test
+ void testCloseStartedRebalance() {
+ MvPartitionStorage mvPartitionStorage =
getOrCreateMvPartition(PARTITION_ID);
+
+ assertThat(tableStorage.startRebalancePartition(PARTITION_ID),
willCompleteSuccessfully());
+
+ assertDoesNotThrow(mvPartitionStorage::close);
+ }
+
+ @Test
+ void testDestroyStartedRebalance() {
+ getOrCreateMvPartition(PARTITION_ID);
+
+ assertThat(tableStorage.startRebalancePartition(PARTITION_ID),
willCompleteSuccessfully());
+
+ assertThat(tableStorage.destroyPartition(PARTITION_ID),
willCompleteSuccessfully());
+ }
+
private static void createTestIndexes(TablesConfiguration tablesConfig) {
List<IndexDefinition> indexDefinitions = List.of(
SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index a5c435df51..f88c0b4ecb 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -551,8 +551,6 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public void close() {
- assert !rebalance;
-
closed = true;
clear0();
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index a049b4da13..251af1fde2 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage.impl;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
import static org.mockito.Mockito.spy;
@@ -212,12 +213,13 @@ public class TestMvTableStorage implements MvTableStorage
{
public CompletableFuture<Void> destroy() {
stop();
- return mvPartitionStorages.destroyAll(this::destroyPartition);
+ return mvPartitionStorages.getAllForCloseOrDestroy()
+ .thenCompose(mvStorages ->
allOf(mvStorages.stream().map(this::destroyPartition).toArray(CompletableFuture[]::new)));
}
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
- return mvPartitionStorages.startRebalace(partitionId,
mvPartitionStorage -> {
+ return mvPartitionStorages.startRebalance(partitionId,
mvPartitionStorage -> {
mvPartitionStorage.startRebalance();
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::startRebalance);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 7141005556..697cfd6b48 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.storage.pagememory;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
import static
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.ignite.internal.pagememory.DataRegion;
@@ -103,7 +106,11 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
busyLock.block();
try {
-
IgniteUtils.closeAll(mvPartitionStorages.getAllForClose().stream().map(mvPartitionStorage
-> mvPartitionStorage::close));
+ CompletableFuture<List<AbstractPageMemoryMvPartitionStorage>>
allForCloseOrDestroy
+ = mvPartitionStorages.getAllForCloseOrDestroy();
+
+ // 10 seconds is taken by analogy with shutdown of thread pool, in
general this should be fairly fast.
+ IgniteUtils.closeAllManually(allForCloseOrDestroy.get(10,
TimeUnit.SECONDS).stream());
} catch (Exception e) {
throw new StorageException("Failed to stop PageMemory table
storage: " + getTableName(), e);
}
@@ -117,7 +124,8 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
busyLock.block();
- return mvPartitionStorages.destroyAll(this::destroyMvPartitionStorage)
+ return mvPartitionStorages.getAllForCloseOrDestroy()
+ .thenCompose(storages ->
allOf(storages.stream().map(this::destroyMvPartitionStorage).toArray(CompletableFuture[]::new)))
.whenComplete((unused, throwable) -> {
if (throwable == null) {
finishDestruction();
@@ -209,7 +217,7 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
- return inBusyLock(busyLock, () ->
mvPartitionStorages.startRebalace(partitionId, mvPartitionStorage -> {
+ return inBusyLock(busyLock, () ->
mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> {
mvPartitionStorage.startRebalance();
return clearStorageAndUpdateDataStructures(mvPartitionStorage)
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 5516a5f127..56c529375f 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -751,11 +751,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
* @param goingToDestroy If the closure is in preparation for destruction.
*/
private void close(boolean goingToDestroy) {
- if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
- StorageState state = this.state.get();
-
- assert state == StorageState.CLOSED :
IgniteStringFormatter.format("{}, state={}", createStorageInfo(), state);
+ StorageState previous = state.getAndSet(StorageState.CLOSED);
+ if (previous == StorageState.CLOSED) {
return;
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 31278c1a90..452579db0d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1017,11 +1017,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
@Override
public void close() {
- if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
- StorageState state = this.state.get();
-
- assert state == StorageState.CLOSED : state;
+ StorageState previous = state.getAndSet(StorageState.CLOSED);
+ if (previous == StorageState.CLOSED) {
return;
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f7b75eb41f..745bd6dbfb 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -41,6 +41,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -358,19 +359,23 @@ public class RocksDbTableStorage implements
MvTableStorage {
resources.add(writeOptions);
- mvPartitionStorages.getAllForClose().forEach(mvPartitionStorage ->
resources.add(mvPartitionStorage::close));
-
- for (HashIndex index : hashIndices.values()) {
- resources.add(index::close);
- }
+ try {
+ mvPartitionStorages
+ .getAllForCloseOrDestroy()
+ // 10 seconds is taken by analogy with shutdown of thread
pool, in general this should be fairly fast.
+ .get(10, TimeUnit.SECONDS)
+ .forEach(mvPartitionStorage ->
resources.add(mvPartitionStorage::close));
+
+ for (HashIndex index : hashIndices.values()) {
+ resources.add(index::close);
+ }
- for (SortedIndex index : sortedIndices.values()) {
- resources.add(index::close);
- }
+ for (SortedIndex index : sortedIndices.values()) {
+ resources.add(index::close);
+ }
- Collections.reverse(resources);
+ Collections.reverse(resources);
- try {
IgniteUtils.closeAll(resources);
} catch (Exception e) {
throw new StorageException("Failed to stop RocksDB table storage:
" + getTableName(), e);
@@ -605,7 +610,7 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
- return inBusyLock(busyLock, () ->
mvPartitionStorages.startRebalace(partitionId, mvPartitionStorage -> {
+ return inBusyLock(busyLock, () ->
mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> {
try (WriteBatch writeBatch = new WriteBatch()) {
mvPartitionStorage.startRebalance(writeBatch);