This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 26dd41a0b1 IGNITE-19648 Failed to cancel rebalance (#2155)
26dd41a0b1 is described below

commit 26dd41a0b1a2a3b091c92f46ec2623869993c3fe
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jun 7 11:50:21 2023 +0300

    IGNITE-19648 Failed to cancel rebalance (#2155)
---
 .../ignite/internal/storage/StorageException.java  | 12 ++++
 .../storage/StorageRebalanceException.java         | 11 ++++
 .../internal/storage/util/MvPartitionStorages.java | 75 ++++++++++++++--------
 .../internal/storage/util/StorageOperation.java    | 29 +++++++++
 .../storage/util/MvPartitionStoragesTest.java      | 38 ++++++++++-
 5 files changed, 137 insertions(+), 28 deletions(-)

diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index c70d17c08d..bd279fe6c9 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -112,4 +112,16 @@ public class StorageException extends 
IgniteInternalException {
     public StorageException(String messagePattern, Object... params) {
         this(IgniteStringFormatter.format(messagePattern, params));
     }
+
+    /**
+     * Constructor.
+     *
+     * @param code Full error code.
+     * @param messagePattern Error message pattern.
+     * @param params Error message params.
+     * @see IgniteStringFormatter#format(String, Object...)
+     */
+    public StorageException(int code, String messagePattern, Object... params) 
{
+        this(code, IgniteStringFormatter.format(messagePattern, params));
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
index 85958709fe..cd2c9ebf31 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
@@ -64,4 +64,15 @@ public class StorageRebalanceException extends 
StorageException {
     public StorageRebalanceException(String messagePattern, Throwable cause, 
Object... params) {
         super(Storage.STORAGE_REBALANCE_ERR, messagePattern, cause, params);
     }
+
+    /**
+     * Constructor.
+     *
+     * @param messagePattern Error message pattern.
+     * @param params Error message params.
+     * @see IgniteStringFormatter#format(String, Object...)
+     */
+    public StorageRebalanceException(String messagePattern, Object... params) {
+        super(Storage.STORAGE_REBALANCE_ERR, messagePattern, params);
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
index 6a726ce6be..d8360fa6a3 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
@@ -130,7 +130,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
         }).whenComplete((storage, throwable) -> 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
             assert operation instanceof CreateStorageOperation : 
createStorageInfo(partitionId) + ", op=" + operation;
 
-            return completeOperation(operation);
+            return nextOperationIfAvailable(operation);
         }));
     }
 
@@ -161,7 +161,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                     operationByPartitionId.compute(partitionId, (partId, 
operation) -> {
                         assert operation instanceof DestroyStorageOperation : 
createStorageInfo(partitionId) + ", op=" + operation;
 
-                        return completeOperation(operation);
+                        return nextOperationIfAvailable(operation);
                     });
 
                     if (throwable == null) {
@@ -199,7 +199,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                         operationByPartitionId.compute(partitionId, (partId, 
operation) -> {
                             assert operation instanceof 
CleanupStorageOperation : createStorageInfo(partitionId) + ", op=" + operation;
 
-                            return completeOperation(operation);
+                            return nextOperationIfAvailable(operation);
                         })
                 );
     }
@@ -215,19 +215,21 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
      * @throws StorageRebalanceException If rebalancing is already in progress.
      */
     public CompletableFuture<Void> startRebalance(int partitionId, Function<T, 
CompletableFuture<Void>> startRebalanceStorageFunction) {
-        operationByPartitionId.compute(partitionId, (partId, operation) -> {
-            checkStorageExistsForRebalance(partitionId);
+        StartRebalanceStorageOperation startRebalanceOperation = 
(StartRebalanceStorageOperation) operationByPartitionId.compute(
+                partitionId,
+                (partId, operation) -> {
+                    checkStorageExistsForRebalance(partitionId);
 
-            if (operation != null) {
-                throwExceptionDependingOnOperationForRebalance(operation, 
partitionId);
-            }
+                    if (operation != null) {
+                        
throwExceptionDependingOnOperationForRebalance(operation, partitionId);
+                    }
 
-            if (rebalanceFutureByPartitionId.containsKey(partitionId)) {
-                throw new 
StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId));
-            }
+                    if (rebalanceFutureByPartitionId.containsKey(partitionId)) 
{
+                        throw new 
StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId));
+                    }
 
-            return new StartRebalanceStorageOperation();
-        });
+                    return new StartRebalanceStorageOperation();
+                });
 
         return completedFuture(null)
                 .thenCompose(unused -> {
@@ -238,14 +240,16 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                     assert old == null : createStorageInfo(partitionId);
 
                     return startRebalanceFuture;
-                }).whenComplete((unused, throwable) ->
-                        operationByPartitionId.compute(partitionId, (partId, 
operation) -> {
-                            assert operation instanceof 
StartRebalanceStorageOperation :
-                                    createStorageInfo(partitionId) + ", op=" + 
operation;
+                }).whenComplete((unused, throwable) -> {
+                    operationByPartitionId.compute(partitionId, (partId, 
operation) -> {
+                        assert operation instanceof 
StartRebalanceStorageOperation : createStorageInfo(partitionId) + ", op=" + 
operation;
 
-                            return completeOperation(operation);
-                        })
-                );
+                        return nextOperationIfAvailable(operation);
+                    });
+
+                    // Even if an error occurs, we must be able to abort the 
rebalance, so we do not report the error.
+                    
startRebalanceOperation.getStartRebalanceFuture().complete(null);
+                });
     }
 
     /**
@@ -258,9 +262,17 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
      * @throws StorageRebalanceException If the storage does not exist or 
another operation is already in progress.
      */
     public CompletableFuture<Void> abortRebalance(int partitionId, Function<T, 
CompletableFuture<Void>> abortRebalanceStorageFunction) {
-        operationByPartitionId.compute(partitionId, (partId, operation) -> {
+        StorageOperation storageOperation = 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
             checkStorageExistsForRebalance(partitionId);
 
+            if (operation instanceof StartRebalanceStorageOperation) {
+                if (!((StartRebalanceStorageOperation) 
operation).setAbortOperation(new AbortRebalanceStorageOperation())) {
+                    throw new StorageRebalanceException("Rebalance abort is 
already planned: [{}]", createStorageInfo(partitionId));
+                }
+
+                return operation;
+            }
+
             if (operation != null) {
                 throwExceptionDependingOnOperationForRebalance(operation, 
partitionId);
             }
@@ -268,7 +280,10 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
             return new AbortRebalanceStorageOperation();
         });
 
-        return completedFuture(null)
+        CompletableFuture<?> startRebalanceFuture = storageOperation 
instanceof StartRebalanceStorageOperation
+                ? ((StartRebalanceStorageOperation) 
storageOperation).getStartRebalanceFuture() : completedFuture(null);
+
+        return startRebalanceFuture
                 .thenCompose(unused -> {
                     CompletableFuture<Void> rebalanceFuture = 
rebalanceFutureByPartitionId.remove(partitionId);
 
@@ -284,7 +299,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                             assert operation instanceof 
AbortRebalanceStorageOperation :
                                     createStorageInfo(partitionId) + ", op=" + 
operation;
 
-                            return completeOperation(operation);
+                            return nextOperationIfAvailable(operation);
                         })
                 );
     }
@@ -326,7 +341,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                             assert operation instanceof 
FinishRebalanceStorageOperation :
                                     createStorageInfo(partitionId) + ", op=" + 
operation;
 
-                            return completeOperation(operation);
+                            return nextOperationIfAvailable(operation);
                         })
                 );
     }
@@ -408,14 +423,22 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
         return "Storage in the process of rebalance: [" + 
createStorageInfo(partitionId) + ']';
     }
 
-    private static @Nullable StorageOperation 
completeOperation(StorageOperation operation) {
+    private static @Nullable StorageOperation 
nextOperationIfAvailable(StorageOperation operation) {
         operation.operationFuture().complete(null);
 
         if (operation.isFinalOperation()) {
             return operation;
         }
 
-        return operation instanceof DestroyStorageOperation ? 
((DestroyStorageOperation) operation).getCreateStorageOperation() : null;
+        if (operation instanceof DestroyStorageOperation) {
+            return ((DestroyStorageOperation) 
operation).getCreateStorageOperation();
+        }
+
+        if (operation instanceof StartRebalanceStorageOperation) {
+            return ((StartRebalanceStorageOperation) 
operation).getAbortRebalanceOperation();
+        }
+
+        return null;
     }
 
     /**
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
index 81c1a57e45..a9599107cc 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
@@ -109,6 +109,35 @@ abstract class StorageOperation {
      * Storage rebalancing start operation.
      */
     static class StartRebalanceStorageOperation extends StorageOperation {
+        /** Used if the rebalance abortion was called before the rebalance 
start was completed. */
+        private final AtomicReference<AbortRebalanceStorageOperation> 
abortRebalanceOperation = new AtomicReference<>();
+
+        private final CompletableFuture<Void> startRebalanceFuture = new 
CompletableFuture<>();
+
+        /**
+         * Attempts to set the abort rebalance operation.
+         *
+         * @param abortRebalanceOperation Abort rebalance operation.
+         * @return {@code true} if the operation was set by the current method 
invocation, {@code false} if by another method invocation.
+         */
+        boolean setAbortOperation(AbortRebalanceStorageOperation 
abortRebalanceOperation) {
+            return this.abortRebalanceOperation.compareAndSet(null, 
abortRebalanceOperation);
+        }
+
+        /**
+         * Returns the {@link 
#setAbortOperation(AbortRebalanceStorageOperation) set} a abort rebalance 
operation.
+         */
+        @Nullable AbortRebalanceStorageOperation getAbortRebalanceOperation() {
+            return abortRebalanceOperation.get();
+        }
+
+        /**
+         * Returns the start rebalance future.
+         */
+        CompletableFuture<Void> getStartRebalanceFuture() {
+            return startRebalanceFuture;
+        }
+
         @Override
         String inProcessErrorMessage(String storageInfo) {
             return "Storage in the process of starting a rebalance: [" + 
storageInfo + ']';
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
index 5b7be23e5b..91202f5293 100644
--- 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
@@ -326,8 +326,6 @@ public class MvPartitionStoragesTest {
 
         assertThrowsWithMessage(StorageRebalanceException.class, () -> 
startRebalanceMvStorage(0),
                 "Storage in the process of starting a rebalance");
-        assertThrowsWithMessage(StorageRebalanceException.class, () -> 
abortRebalanceMvStorage(0),
-                "Storage in the process of starting a rebalance");
         assertThrowsWithMessage(StorageRebalanceException.class, () -> 
finishRebalanceMvStorage(0),
                 "Storage in the process of starting a rebalance");
 
@@ -651,6 +649,42 @@ public class MvPartitionStoragesTest {
         assertThat(allForCloseOrDestroyFuture, willCompleteSuccessfully());
     }
 
+    @Test
+    void testAbortRebalanceBeforeFinishStartRebalance() {
+        assertThat(createMvStorage(0), willCompleteSuccessfully());
+
+        CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
+
+        CompletableFuture<Void> startRebalanceFuture = 
mvPartitionStorages.startRebalance(0, mvPartitionStorage -> rebalanceFuture);
+
+        CompletableFuture<Void> startAbortRebalanceFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> finishAbortRebalanceFuture = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> abortRebalanceFuture = 
mvPartitionStorages.abortRebalance(0, mvPartitionStorage -> {
+            startAbortRebalanceFuture.complete(null);
+
+            return finishAbortRebalanceFuture;
+        });
+
+        // Make sure that the abortion of the rebalance does not start until 
the start of the rebalance is over.
+        assertThat(startAbortRebalanceFuture, willTimeoutFast());
+
+        // You can't abort rebalancing a second time.
+        assertThrowsWithMessage(StorageRebalanceException.class, () -> 
abortRebalanceMvStorage(0), "Rebalance abort is already planned");
+
+        rebalanceFuture.complete(null);
+
+        // Let's make sure that the rebalancing abortion will start only after 
the rebalancing start is completed.
+        assertThat(startRebalanceFuture, willCompleteSuccessfully());
+        assertThat(startAbortRebalanceFuture, willCompleteSuccessfully());
+        assertThat(abortRebalanceFuture, willTimeoutFast());
+
+        // Let's finish the rebalancing abortion.
+        finishAbortRebalanceFuture.complete(null);
+
+        assertThat(abortRebalanceFuture, willCompleteSuccessfully());
+    }
+
     private MvPartitionStorage getMvStorage(int partitionId) {
         return mvPartitionStorages.get(partitionId);
     }

Reply via email to