This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d4b2852cb IGNITE-23571 resetPartitions improvements: do not try to 
recover replica factor after not manual reset (#4686)
9d4b2852cb is described below

commit 9d4b2852cbcf6c2414f2b6eaaaa9b742a3575762
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Nov 8 13:00:22 2024 +0400

    IGNITE-23571 resetPartitions improvements: do not try to recover replica 
factor after not manual reset (#4686)
---
 .../rest/recovery/DisasterRecoveryController.java  |   3 +-
 .../ItDisasterRecoveryReconfigurationTest.java     | 243 +++++++++++++++++++--
 .../disaster/DisasterRecoveryManager.java          |  10 +-
 .../DisasterRecoveryRequestSerializer.java         |   6 +-
 ...pUpdateRequest.java => GroupUpdateRequest.java} |  33 ++-
 ...izer.java => GroupUpdateRequestSerializer.java} |  14 +-
 .../DisasterRecoveryRequestSerializerTest.java     |  19 +-
 7 files changed, 277 insertions(+), 51 deletions(-)

diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index d14c730190..fcec2e49a1 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -87,7 +87,8 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
         return disasterRecoveryManager.resetPartitions(
                 command.zoneName(),
                 command.tableName(),
-                command.partitionIds()
+                command.partitionIds(),
+                true
         );
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index dea685073b..405e8b2775 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -27,6 +27,8 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurationSchema.DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
@@ -44,6 +46,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -67,6 +70,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.OperationType;
@@ -98,6 +102,7 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -234,7 +239,8 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of()
+                Set.of(),
+                true
         );
 
         assertThat(updateFuture, willSucceedIn(60, SECONDS));
@@ -280,7 +286,8 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of(anotherPartId)
+                Set.of(anotherPartId),
+                true
         );
 
         assertThat(updateFuture, willSucceedIn(60, SECONDS));
@@ -319,7 +326,8 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of()
+                Set.of(),
+                true
         );
 
         assertThat(updateFuture, willCompleteSuccessfully());
@@ -355,12 +363,6 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         int catalogVersion = node0.catalogManager().latestCatalogVersion();
         long timestamp = node0.catalogManager().catalog(catalogVersion).time();
 
-        Assignments assignment013 = Assignments.of(timestamp,
-                Assignment.forPeer(node(0).name()),
-                Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(3).name())
-        );
-
         Table table = node0.tables().table(TABLE_NAME);
 
         awaitPrimaryReplica(node0, partId);
@@ -379,18 +381,16 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         assertRealAssignments(node0, partId, 0, 1, 3);
 
-        
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
-            BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg) 
-> stableKeySwitchMessage(msg, partId, assignment013);
-            BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
+        Assignments assignment013 = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name())
+        );
 
-            if (oldPredicate == null) {
-                node.dropMessages(newPredicate);
-            } else {
-                node.dropMessages(oldPredicate.or(newPredicate));
-            }
-        });
+        blockRebalanceStableSwitch(partId, assignment013);
 
-        CompletableFuture<Void> resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        CompletableFuture<Void> resetFuture =
+                node0.disasterRecoveryManager().resetPartitions(zoneName, 
QUALIFIED_TABLE_NAME, emptySet(), true);
         assertThat(resetFuture, willCompleteSuccessfully());
 
         waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
@@ -407,7 +407,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
 
-        resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet(), true);
         assertThat(resetFuture, willCompleteSuccessfully());
 
         waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
@@ -434,6 +434,166 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         });
     }
 
+    /**
+     * Tests that in a situation from the test {@link 
#testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a
+     * disaster recovery API, but with manual flag set to false. We expect 
that in this replica factor won't be restored.
+     * In this test, assignments will be (1, 3, 4), according to {@link 
RendezvousDistributionFunction}.
+     */
+    @Test
+    @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
+    void testAutomaticRebalanceIfMajorityIsLost() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 1, 3, 4);
+
+        stopNodesInParallel(3, 4);
+
+        waitForScale(node0, 3);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                Set.of(),
+                false
+        );
+
+        assertThat(updateFuture, willSucceedIn(60, SECONDS));
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 1);
+
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        // Check that there is no ongoing or planned rebalance.
+        assertNull(getPendingAssignments(node0, partId));
+
+        assertRealAssignments(node0, partId, 1);
+    }
+
+    /**
+     * Tests a scenario where there's a single partition on a node 1, and the 
node that hosts it is lost.
+     * Not manual reset should do nothing in that case, so no new pending or 
planned is presented.
+     */
+    @Test
+    @ZoneParams(nodes = 2, replicas = 1, partitions = 1)
+    void testAutomaticRebalanceIfPartitionIsLost() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+
+        IgniteImpl node1 = unwrapIgniteImpl(cluster.node(1));
+
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 1);
+
+        stopNodesInParallel(1);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                Set.of(),
+                false
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        // Check that there is no ongoing or planned rebalance.
+        assertNull(getPendingAssignments(node0, partId));
+
+        assertEquals(1, getStableAssignments(node0, partId).nodes().size());
+
+        assertEquals(node1.name(), getStableAssignments(node0, 
partId).nodes().stream().findFirst().get().consistentId());
+    }
+
+    /**
+     * Tests a scenario where only one node from stable is left, but we have 
node in pending nodes and perform reset partition operation.
+     * We expect this node from pending being presented after reset, so not 
manual reset logic take into account pending nodes.
+     *
+     * <p>It goes like this:
+     * <ul>
+     *     <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+     *     <li>We stop node 5, so rebalance on 1, 3, 4 is triggered, but 
blocked and cannot be finished.</li>
+     *     <li>Zones scale down is set to infinite value, we stop node 4 and 
new rebalance is not triggered and majority is lost.</li>
+     *     <li>We execute "resetPartitions" and expect that pending 
assignments will be 1, 3, so node 3 from pending is presented.</li>
+     * </ul>
+     */
+    @Test
+    @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+    public void testIncompleteRebalanceBeforeAutomaticResetPartitions() throws 
Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 1, 4, 5);
+
+        insertValues(table, partId, 0);
+
+        Assignments assignment134 = Assignments.of(timestamp,
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(4).name())
+        );
+
+        blockRebalanceStableSwitch(partId, assignment134);
+
+        stopNode(5);
+
+        waitForScale(node0, 5);
+
+        assertRealAssignments(node0, partId, 1, 3, 4);
+
+        assertPendingAssignments(node0, partId, assignment134);
+
+        assertFalse(getPendingAssignments(node0, partId).force());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        stopNode(4);
+
+        CompletableFuture<Void> resetFuture =
+                node0.disasterRecoveryManager().resetPartitions(zoneName, 
QUALIFIED_TABLE_NAME, emptySet(), false);
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        Assignments assignmentForced13 = 
Assignments.forced(Set.of(Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name())), timestamp);
+
+        assertPendingAssignments(node0, partId, assignmentForced13);
+    }
+
+    /**
+     * Block rebalance, so stable won't be switched to specified pending.
+     */
+    private void blockRebalanceStableSwitch(int partId, Assignments 
assignment) {
+        
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
+            BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg) 
-> stableKeySwitchMessage(msg, partId, assignment);
+            BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
+
+            if (oldPredicate == null) {
+                node.dropMessages(newPredicate);
+            } else {
+                node.dropMessages(oldPredicate.or(newPredicate));
+            }
+        });
+    }
+
     private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, 
Assignments blockedAssignments) {
         if (msg instanceof WriteActionRequest) {
             var writeActionRequest = (WriteActionRequest) msg;
@@ -476,7 +636,10 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
             GlobalPartitionState state = map.values().iterator().next();
 
             return state.state == expectedState;
-        }, 500, 20_000));
+        }, 500, 20_000),
+                () -> "Expected state: " + expectedState
+                        + ", actual: " + 
node0.disasterRecoveryManager().globalPartitionStates(Set.of(zoneName), 
emptySet()).join()
+        );
     }
 
     private void triggerRaftSnapshot(int nodeIdx, int partId) throws 
InterruptedException, ExecutionException {
@@ -504,7 +667,17 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
     }
 
     private void assertRealAssignments(IgniteImpl node0, int partId, 
Integer... expected) throws InterruptedException {
-        assertTrue(waitForCondition(() -> 
List.of(expected).equals(getRealAssignments(node0, partId)), 2000));
+        assertTrue(
+                waitForCondition(() -> 
List.of(expected).equals(getRealAssignments(node0, partId)), 2000),
+                () -> "Expected: " + List.of(expected) + ", actual: " + 
getRealAssignments(node0, partId)
+        );
+    }
+
+    private void assertPendingAssignments(IgniteImpl node0, int partId, 
Assignments expected) throws InterruptedException {
+        assertTrue(
+                waitForCondition(() -> 
expected.equals(getPendingAssignments(node0, partId)), 2000),
+                () -> "Expected: " + expected + ", actual: " + 
getPendingAssignments(node0, partId)
+        );
     }
 
     /**
@@ -585,6 +758,10 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         }, 250, SECONDS.toMillis(60)));
     }
 
+    /**
+     * Return assignments based on states of partitions in the cluster. It is 
possible that returned value contains nodes
+     * from stable and pending, for example, when rebalance is in progress.
+     */
     private List<Integer> getRealAssignments(IgniteImpl node0, int partId) {
         CompletableFuture<Map<TablePartitionId, LocalPartitionStateByNode>> 
partitionStatesFut = node0.disasterRecoveryManager()
                 .localPartitionStates(Set.of(zoneName), Set.of(), Set.of());
@@ -599,6 +776,28 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
                 .collect(Collectors.toList());
     }
 
+    private @Nullable Assignments getPendingAssignments(IgniteImpl node0, int 
partId) {
+        CompletableFuture<Entry> pendingFut = node0.metaStorageManager()
+                .get(pendingPartAssignmentsKey(new TablePartitionId(tableId, 
partId)));
+
+        assertThat(pendingFut, willCompleteSuccessfully());
+
+        Entry pending = pendingFut.join();
+
+        return pending.empty() ? null : Assignments.fromBytes(pending.value());
+    }
+
+    private @Nullable Assignments getStableAssignments(IgniteImpl node0, int 
partId) {
+        CompletableFuture<Entry> stableFut = node0.metaStorageManager()
+                .get(stablePartAssignmentsKey(new TablePartitionId(tableId, 
partId)));
+
+        assertThat(stableFut, willCompleteSuccessfully());
+
+        Entry stable = stableFut.join();
+
+        return stable.empty() ? null : Assignments.fromBytes(stable.value());
+    }
+
     @Retention(RetentionPolicy.RUNTIME)
     @interface ZoneParams {
         int replicas();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index a30aedf306..b411af60c0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -102,7 +102,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Manager, responsible for "disaster recovery" operations.
  * Internally it triggers meta-storage updates, in order to acquire unique 
causality token.
- * As a reaction to these updates, manager performs actual recovery 
operations, such as {@link #resetPartitions(String, String, Set)}.
+ * As a reaction to these updates, manager performs actual recovery operations,
+ * such as {@link #resetPartitions(String, String, Set, boolean)}.
  * More details are in the <a 
href="https://issues.apache.org/jira/browse/IGNITE-21140";>epic</a>.
  */
 public class DisasterRecoveryManager implements IgniteComponent, 
SystemViewProvider {
@@ -247,9 +248,10 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
      * @param tableName Fully-qualified table name. Case-sensitive, without 
quotes. Example: "PUBLIC.Foo".
      * @param partitionIds IDs of partitions to reset. If empty, reset all 
zone's partitions.
+     * @param manualUpdate Whether the update is triggered manually by user or 
automatically by core logic.
      * @return Future that completes when partitions are reset.
      */
-    public CompletableFuture<Void> resetPartitions(String zoneName, String 
tableName, Set<Integer> partitionIds) {
+    public CompletableFuture<Void> resetPartitions(String zoneName, String 
tableName, Set<Integer> partitionIds, boolean manualUpdate) {
         try {
             Catalog catalog = catalogLatestVersion();
 
@@ -259,7 +261,9 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
             checkPartitionsRange(partitionIds, Set.of(zone));
 
-            return processNewRequest(new 
ManualGroupUpdateRequest(UUID.randomUUID(), catalog.version(), zone.id(), 
tableId, partitionIds));
+            return processNewRequest(
+                    new GroupUpdateRequest(UUID.randomUUID(), 
catalog.version(), zone.id(), tableId, partitionIds, manualUpdate)
+            );
         } catch (Throwable t) {
             return failedFuture(t);
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
index 4a7c96c74e..7f407af488 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
@@ -50,7 +50,7 @@ class DisasterRecoveryRequestSerializer extends 
VersionedSerializer<DisasterReco
     }
 
     private enum Operation {
-        MANUAL_GROUP_UPDATE(0, ManualGroupUpdateRequestSerializer.INSTANCE),
+        GROUP_UPDATE(0, GroupUpdateRequestSerializer.INSTANCE),
         MANUAL_GROUP_RESTART(1, ManualGroupRestartRequestSerializer.INSTANCE);
 
         private static final Map<Integer, Operation> valuesByCode = 
Arrays.stream(values())
@@ -75,8 +75,8 @@ class DisasterRecoveryRequestSerializer extends 
VersionedSerializer<DisasterReco
         }
 
         static Operation findByRequest(DisasterRecoveryRequest request) {
-            if (request instanceof ManualGroupUpdateRequest) {
-                return MANUAL_GROUP_UPDATE;
+            if (request instanceof GroupUpdateRequest) {
+                return GROUP_UPDATE;
             }
             if (request instanceof ManualGroupRestartRequest) {
                 return MANUAL_GROUP_RESTART;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
similarity index 94%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
index fb05b50d16..2abaffaf99 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
@@ -73,7 +73,7 @@ import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.jetbrains.annotations.Nullable;
 
-class ManualGroupUpdateRequest implements DisasterRecoveryRequest {
+class GroupUpdateRequest implements DisasterRecoveryRequest {
     private final UUID operationId;
 
     /**
@@ -87,12 +87,15 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
 
     private final Set<Integer> partitionIds;
 
-    ManualGroupUpdateRequest(UUID operationId, int catalogVersion, int zoneId, 
int tableId, Set<Integer> partitionIds) {
+    private final boolean manualUpdate;
+
+    GroupUpdateRequest(UUID operationId, int catalogVersion, int zoneId, int 
tableId, Set<Integer> partitionIds, boolean manualUpdate) {
         this.operationId = operationId;
         this.catalogVersion = catalogVersion;
         this.zoneId = zoneId;
         this.tableId = tableId;
         this.partitionIds = Set.copyOf(partitionIds);
+        this.manualUpdate = manualUpdate;
     }
 
     @Override
@@ -122,6 +125,10 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
         return partitionIds;
     }
 
+    public boolean manualUpdate() {
+        return manualUpdate;
+    }
+
     @Override
     public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
         int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
@@ -206,7 +213,7 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
             TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partitionIdsArray[partitionId]);
 
             futures[partitionId] = 
tableAssignmentsFut.thenCompose(tableAssignments ->
-                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
manualPartitionUpdate(
+                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
partitionUpdate(
                             replicaGrpId,
                             aliveDataNodes,
                             aliveNodesConsistentIds,
@@ -215,7 +222,8 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
                             metaStorageManager,
                             
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
                             localStatesMap.get(replicaGrpId),
-                            assignmentsTimestamp
+                            assignmentsTimestamp,
+                            this.manualUpdate
                     )).thenAccept(res -> {
                         DisasterRecoveryManager.LOG.info(
                                 "Partition {} returned {} status on reset 
attempt", replicaGrpId, UpdateStatus.valueOf(res)
@@ -227,7 +235,7 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
         return futures;
     }
 
-    private static CompletableFuture<Integer> manualPartitionUpdate(
+    private static CompletableFuture<Integer> partitionUpdate(
             TablePartitionId partId,
             Collection<String> aliveDataNodes,
             Set<String> aliveNodesConsistentIds,
@@ -236,7 +244,8 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
             MetaStorageManager metaStorageMgr,
             Set<Assignment> currentAssignments,
             LocalPartitionStateMessageByNode localPartitionStateMessageByNode,
-            long assignmentsTimestamp
+            long assignmentsTimestamp,
+            boolean manualUpdate
     ) {
         Set<Assignment> partAssignments = 
getAliveNodesWithData(aliveNodesConsistentIds, 
localPartitionStateMessageByNode);
         Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentAssignments, partAssignments);
@@ -248,6 +257,10 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
         Iif invokeClosure;
 
         if (aliveStableNodes.isEmpty()) {
+            if (!manualUpdate) {
+                return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+            }
+
             enrichAssignments(partId, aliveDataNodes, replicas, 
partAssignments);
 
             // There are no known nodes with data, which means that we can 
just put new assignments into pending assignments with "forced"
@@ -260,10 +273,14 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
             );
         } else {
             Set<Assignment> stableAssignments = Set.copyOf(partAssignments);
-            enrichAssignments(partId, aliveDataNodes, replicas, 
partAssignments);
+
+            if (manualUpdate) {
+                enrichAssignments(partId, aliveDataNodes, replicas, 
partAssignments);
+            }
 
             // There are nodes with data, and we set pending assignments to 
this set of nodes. It'll be the source of peers for
-            // "resetPeers", and after that new assignments with restored 
replica factor wil be picked up from planned assignments.
+            // "resetPeers", and after that new assignments with restored 
replica factor wil be picked up from planned assignments
+            // for the case of the manual update, that was triggered by a user.
             invokeClosure = prepareMsInvokeClosure(
                     partId,
                     longToBytesKeepingOrder(revision),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestSerializer.java
similarity index 73%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestSerializer.java
index ee34659b09..50df274644 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestSerializer.java
@@ -28,29 +28,31 @@ import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.apache.ignite.internal.versioned.VersionedSerializer;
 
 /**
- * {@link VersionedSerializer} for {@link ManualGroupUpdateRequest} instances.
+ * {@link VersionedSerializer} for {@link GroupUpdateRequest} instances.
  */
-class ManualGroupUpdateRequestSerializer extends 
VersionedSerializer<ManualGroupUpdateRequest> {
+class GroupUpdateRequestSerializer extends 
VersionedSerializer<GroupUpdateRequest> {
     /** Serializer instance. */
-    static final ManualGroupUpdateRequestSerializer INSTANCE = new 
ManualGroupUpdateRequestSerializer();
+    static final GroupUpdateRequestSerializer INSTANCE = new 
GroupUpdateRequestSerializer();
 
     @Override
-    protected void writeExternalData(ManualGroupUpdateRequest request, 
IgniteDataOutput out) throws IOException {
+    protected void writeExternalData(GroupUpdateRequest request, 
IgniteDataOutput out) throws IOException {
         out.writeUuid(request.operationId());
         out.writeVarInt(request.catalogVersion());
         out.writeVarInt(request.zoneId());
         out.writeVarInt(request.tableId());
         writeVarIntSet(request.partitionIds(), out);
+        out.writeBoolean(request.manualUpdate());
     }
 
     @Override
-    protected ManualGroupUpdateRequest readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+    protected GroupUpdateRequest readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
         UUID operationId = in.readUuid();
         int catalogVersion = in.readVarIntAsInt();
         int zoneId = in.readVarIntAsInt();
         int tableId = in.readVarIntAsInt();
         Set<Integer> partitionIds = readVarIntSet(in);
+        boolean manualUpdate = in.readBoolean();
 
-        return new ManualGroupUpdateRequest(operationId, catalogVersion, 
zoneId, tableId, partitionIds);
+        return new GroupUpdateRequest(operationId, catalogVersion, zoneId, 
tableId, partitionIds, manualUpdate);
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
index 39c95aee15..a9d1ec0217 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
@@ -28,41 +28,44 @@ import 
org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.junit.jupiter.api.Test;
 
 class DisasterRecoveryRequestSerializerTest {
-    private static final String MANUAL_GROUP_UPDATE_REQUEST_V1_BASE64 = 
"Ae++QwEB775D782rkHhWNBIhQ2WHCbrc/ukH0Q+5FwQWDCA=";
+    private static final String GROUP_UPDATE_REQUEST_V1_BASE64 = 
"Ae++QwEB775D782rkHhWNBIhQ2WHCbrc/ukH0Q+5FwQMIBYB";
     private static final String MANUAL_GROUP_RESTART_REQUEST_V1_BASE64 = 
"Ae++QwIB775D782rkHhWNBIhQ2WHCbrc/tEPuRcEIBYMAwJiAmH///9///+AgAQ=";
 
     private final DisasterRecoveryRequestSerializer serializer = new 
DisasterRecoveryRequestSerializer();
 
     @Test
-    void serializationAndDeserializationOfManualGroupUpdateRequest() {
-        var originalRequest = new ManualGroupUpdateRequest(
+    void serializationAndDeserializationOfGroupUpdateRequest() {
+        var originalRequest = new GroupUpdateRequest(
                 new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
                 1000,
                 2000,
                 3000,
-                Set.of(11, 21, 31)
+                Set.of(11, 21, 31),
+                true
         );
 
         byte[] bytes = VersionedSerialization.toBytes(originalRequest, 
serializer);
-        ManualGroupUpdateRequest restoredRequest = (ManualGroupUpdateRequest) 
VersionedSerialization.fromBytes(bytes, serializer);
+        GroupUpdateRequest restoredRequest = (GroupUpdateRequest) 
VersionedSerialization.fromBytes(bytes, serializer);
 
         assertThat(restoredRequest.operationId(), is(new 
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
         assertThat(restoredRequest.catalogVersion(), is(1000));
         assertThat(restoredRequest.zoneId(), is(2000));
         assertThat(restoredRequest.tableId(), is(3000));
         assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+        assertThat(restoredRequest.manualUpdate(), is(true));
     }
 
     @Test
-    void v1OfManualGroupUpdateRequestCanBeDeserialized() {
-        byte[] bytes = 
Base64.getDecoder().decode(MANUAL_GROUP_UPDATE_REQUEST_V1_BASE64);
-        ManualGroupUpdateRequest restoredRequest = (ManualGroupUpdateRequest) 
VersionedSerialization.fromBytes(bytes, serializer);
+    void v1OfGroupUpdateRequestCanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode(GROUP_UPDATE_REQUEST_V1_BASE64);
+        GroupUpdateRequest restoredRequest = (GroupUpdateRequest) 
VersionedSerialization.fromBytes(bytes, serializer);
 
         assertThat(restoredRequest.operationId(), is(new 
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
         assertThat(restoredRequest.catalogVersion(), is(1000));
         assertThat(restoredRequest.zoneId(), is(2000));
         assertThat(restoredRequest.tableId(), is(3000));
         assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+        assertThat(restoredRequest.manualUpdate(), is(true));
     }
 
     @Test


Reply via email to