This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26dd41a0b1 IGNITE-19648 Failed to cancel rebalance (#2155)
26dd41a0b1 is described below
commit 26dd41a0b1a2a3b091c92f46ec2623869993c3fe
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jun 7 11:50:21 2023 +0300
IGNITE-19648 Failed to cancel rebalance (#2155)
---
.../ignite/internal/storage/StorageException.java | 12 ++++
.../storage/StorageRebalanceException.java | 11 ++++
.../internal/storage/util/MvPartitionStorages.java | 75 ++++++++++++++--------
.../internal/storage/util/StorageOperation.java | 29 +++++++++
.../storage/util/MvPartitionStoragesTest.java | 38 ++++++++++-
5 files changed, 137 insertions(+), 28 deletions(-)
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index c70d17c08d..bd279fe6c9 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -112,4 +112,16 @@ public class StorageException extends
IgniteInternalException {
public StorageException(String messagePattern, Object... params) {
this(IgniteStringFormatter.format(messagePattern, params));
}
+
+ /**
+ * Constructor.
+ *
+ * @param code Full error code.
+ * @param messagePattern Error message pattern.
+ * @param params Error message params.
+ * @see IgniteStringFormatter#format(String, Object...)
+ */
+ public StorageException(int code, String messagePattern, Object... params)
{
+ this(code, IgniteStringFormatter.format(messagePattern, params));
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
index 85958709fe..cd2c9ebf31 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
@@ -64,4 +64,15 @@ public class StorageRebalanceException extends
StorageException {
public StorageRebalanceException(String messagePattern, Throwable cause,
Object... params) {
super(Storage.STORAGE_REBALANCE_ERR, messagePattern, cause, params);
}
+
+ /**
+ * Constructor.
+ *
+ * @param messagePattern Error message pattern.
+ * @param params Error message params.
+ * @see IgniteStringFormatter#format(String, Object...)
+ */
+ public StorageRebalanceException(String messagePattern, Object... params) {
+ super(Storage.STORAGE_REBALANCE_ERR, messagePattern, params);
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
index 6a726ce6be..d8360fa6a3 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
@@ -130,7 +130,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
}).whenComplete((storage, throwable) ->
operationByPartitionId.compute(partitionId, (partId, operation) -> {
assert operation instanceof CreateStorageOperation :
createStorageInfo(partitionId) + ", op=" + operation;
- return completeOperation(operation);
+ return nextOperationIfAvailable(operation);
}));
}
@@ -161,7 +161,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
operationByPartitionId.compute(partitionId, (partId,
operation) -> {
assert operation instanceof DestroyStorageOperation :
createStorageInfo(partitionId) + ", op=" + operation;
- return completeOperation(operation);
+ return nextOperationIfAvailable(operation);
});
if (throwable == null) {
@@ -199,7 +199,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
operationByPartitionId.compute(partitionId, (partId,
operation) -> {
assert operation instanceof
CleanupStorageOperation : createStorageInfo(partitionId) + ", op=" + operation;
- return completeOperation(operation);
+ return nextOperationIfAvailable(operation);
})
);
}
@@ -215,19 +215,21 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
* @throws StorageRebalanceException If rebalancing is already in progress.
*/
public CompletableFuture<Void> startRebalance(int partitionId, Function<T,
CompletableFuture<Void>> startRebalanceStorageFunction) {
- operationByPartitionId.compute(partitionId, (partId, operation) -> {
- checkStorageExistsForRebalance(partitionId);
+ StartRebalanceStorageOperation startRebalanceOperation =
(StartRebalanceStorageOperation) operationByPartitionId.compute(
+ partitionId,
+ (partId, operation) -> {
+ checkStorageExistsForRebalance(partitionId);
- if (operation != null) {
- throwExceptionDependingOnOperationForRebalance(operation,
partitionId);
- }
+ if (operation != null) {
+
throwExceptionDependingOnOperationForRebalance(operation, partitionId);
+ }
- if (rebalanceFutureByPartitionId.containsKey(partitionId)) {
- throw new
StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId));
- }
+ if (rebalanceFutureByPartitionId.containsKey(partitionId))
{
+ throw new
StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId));
+ }
- return new StartRebalanceStorageOperation();
- });
+ return new StartRebalanceStorageOperation();
+ });
return completedFuture(null)
.thenCompose(unused -> {
@@ -238,14 +240,16 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
assert old == null : createStorageInfo(partitionId);
return startRebalanceFuture;
- }).whenComplete((unused, throwable) ->
- operationByPartitionId.compute(partitionId, (partId,
operation) -> {
- assert operation instanceof
StartRebalanceStorageOperation :
- createStorageInfo(partitionId) + ", op=" +
operation;
+ }).whenComplete((unused, throwable) -> {
+ operationByPartitionId.compute(partitionId, (partId,
operation) -> {
+ assert operation instanceof
StartRebalanceStorageOperation : createStorageInfo(partitionId) + ", op=" +
operation;
- return completeOperation(operation);
- })
- );
+ return nextOperationIfAvailable(operation);
+ });
+
+ // Even if an error occurs, we must be able to abort the
rebalance, so we do not report the error.
+
startRebalanceOperation.getStartRebalanceFuture().complete(null);
+ });
}
/**
@@ -258,9 +262,17 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
* @throws StorageRebalanceException If the storage does not exist or
another operation is already in progress.
*/
public CompletableFuture<Void> abortRebalance(int partitionId, Function<T,
CompletableFuture<Void>> abortRebalanceStorageFunction) {
- operationByPartitionId.compute(partitionId, (partId, operation) -> {
+ StorageOperation storageOperation =
operationByPartitionId.compute(partitionId, (partId, operation) -> {
checkStorageExistsForRebalance(partitionId);
+ if (operation instanceof StartRebalanceStorageOperation) {
+ if (!((StartRebalanceStorageOperation)
operation).setAbortOperation(new AbortRebalanceStorageOperation())) {
+ throw new StorageRebalanceException("Rebalance abort is
already planned: [{}]", createStorageInfo(partitionId));
+ }
+
+ return operation;
+ }
+
if (operation != null) {
throwExceptionDependingOnOperationForRebalance(operation,
partitionId);
}
@@ -268,7 +280,10 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
return new AbortRebalanceStorageOperation();
});
- return completedFuture(null)
+ CompletableFuture<?> startRebalanceFuture = storageOperation
instanceof StartRebalanceStorageOperation
+ ? ((StartRebalanceStorageOperation)
storageOperation).getStartRebalanceFuture() : completedFuture(null);
+
+ return startRebalanceFuture
.thenCompose(unused -> {
CompletableFuture<Void> rebalanceFuture =
rebalanceFutureByPartitionId.remove(partitionId);
@@ -284,7 +299,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
assert operation instanceof
AbortRebalanceStorageOperation :
createStorageInfo(partitionId) + ", op=" +
operation;
- return completeOperation(operation);
+ return nextOperationIfAvailable(operation);
})
);
}
@@ -326,7 +341,7 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
assert operation instanceof
FinishRebalanceStorageOperation :
createStorageInfo(partitionId) + ", op=" +
operation;
- return completeOperation(operation);
+ return nextOperationIfAvailable(operation);
})
);
}
@@ -408,14 +423,22 @@ public class MvPartitionStorages<T extends
MvPartitionStorage> {
return "Storage in the process of rebalance: [" +
createStorageInfo(partitionId) + ']';
}
- private static @Nullable StorageOperation
completeOperation(StorageOperation operation) {
+ private static @Nullable StorageOperation
nextOperationIfAvailable(StorageOperation operation) {
operation.operationFuture().complete(null);
if (operation.isFinalOperation()) {
return operation;
}
- return operation instanceof DestroyStorageOperation ?
((DestroyStorageOperation) operation).getCreateStorageOperation() : null;
+ if (operation instanceof DestroyStorageOperation) {
+ return ((DestroyStorageOperation)
operation).getCreateStorageOperation();
+ }
+
+ if (operation instanceof StartRebalanceStorageOperation) {
+ return ((StartRebalanceStorageOperation)
operation).getAbortRebalanceOperation();
+ }
+
+ return null;
}
/**
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
index 81c1a57e45..a9599107cc 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
@@ -109,6 +109,35 @@ abstract class StorageOperation {
* Storage rebalancing start operation.
*/
static class StartRebalanceStorageOperation extends StorageOperation {
+ /** Used if the rebalance abortion was called before the rebalance
start was completed. */
+ private final AtomicReference<AbortRebalanceStorageOperation>
abortRebalanceOperation = new AtomicReference<>();
+
+ private final CompletableFuture<Void> startRebalanceFuture = new
CompletableFuture<>();
+
+ /**
+ * Attempts to set the abort rebalance operation.
+ *
+ * @param abortRebalanceOperation Abort rebalance operation.
+ * @return {@code true} if the operation was set by the current method
invocation, {@code false} if by another method invocation.
+ */
+ boolean setAbortOperation(AbortRebalanceStorageOperation
abortRebalanceOperation) {
+ return this.abortRebalanceOperation.compareAndSet(null,
abortRebalanceOperation);
+ }
+
+ /**
+ * Returns the {@link
#setAbortOperation(AbortRebalanceStorageOperation) set} a abort rebalance
operation.
+ */
+ @Nullable AbortRebalanceStorageOperation getAbortRebalanceOperation() {
+ return abortRebalanceOperation.get();
+ }
+
+ /**
+ * Returns the start rebalance future.
+ */
+ CompletableFuture<Void> getStartRebalanceFuture() {
+ return startRebalanceFuture;
+ }
+
@Override
String inProcessErrorMessage(String storageInfo) {
return "Storage in the process of starting a rebalance: [" +
storageInfo + ']';
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
index 5b7be23e5b..91202f5293 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
@@ -326,8 +326,6 @@ public class MvPartitionStoragesTest {
assertThrowsWithMessage(StorageRebalanceException.class, () ->
startRebalanceMvStorage(0),
"Storage in the process of starting a rebalance");
- assertThrowsWithMessage(StorageRebalanceException.class, () ->
abortRebalanceMvStorage(0),
- "Storage in the process of starting a rebalance");
assertThrowsWithMessage(StorageRebalanceException.class, () ->
finishRebalanceMvStorage(0),
"Storage in the process of starting a rebalance");
@@ -651,6 +649,42 @@ public class MvPartitionStoragesTest {
assertThat(allForCloseOrDestroyFuture, willCompleteSuccessfully());
}
+ @Test
+ void testAbortRebalanceBeforeFinishStartRebalance() {
+ assertThat(createMvStorage(0), willCompleteSuccessfully());
+
+ CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
+
+ CompletableFuture<Void> startRebalanceFuture =
mvPartitionStorages.startRebalance(0, mvPartitionStorage -> rebalanceFuture);
+
+ CompletableFuture<Void> startAbortRebalanceFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> finishAbortRebalanceFuture = new
CompletableFuture<>();
+
+ CompletableFuture<Void> abortRebalanceFuture =
mvPartitionStorages.abortRebalance(0, mvPartitionStorage -> {
+ startAbortRebalanceFuture.complete(null);
+
+ return finishAbortRebalanceFuture;
+ });
+
+ // Make sure that the abortion of the rebalance does not start until
the start of the rebalance is over.
+ assertThat(startAbortRebalanceFuture, willTimeoutFast());
+
+ // You can't abort rebalancing a second time.
+ assertThrowsWithMessage(StorageRebalanceException.class, () ->
abortRebalanceMvStorage(0), "Rebalance abort is already planned");
+
+ rebalanceFuture.complete(null);
+
+ // Let's make sure that the rebalancing abortion will start only after
the rebalancing start is completed.
+ assertThat(startRebalanceFuture, willCompleteSuccessfully());
+ assertThat(startAbortRebalanceFuture, willCompleteSuccessfully());
+ assertThat(abortRebalanceFuture, willTimeoutFast());
+
+ // Let's finish the rebalancing abortion.
+ finishAbortRebalanceFuture.complete(null);
+
+ assertThat(abortRebalanceFuture, willCompleteSuccessfully());
+ }
+
private MvPartitionStorage getMvStorage(int partitionId) {
return mvPartitionStorages.get(partitionId);
}