This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f5875a74777 IGNITE-25870 Unify behavior of REST and CLI (#6288)
f5875a74777 is described below

commit f5875a74777170007fa1c15a2320f9ab8223153f
Author: Cyrill <[email protected]>
AuthorDate: Wed Jul 23 14:28:25 2025 +0300

    IGNITE-25870 Unify behavior of REST and CLI (#6288)
---
 .../network/PartitionReplicationMessageGroup.java  |  12 ++
 .../disaster/LocalPartitionStatesResponse.java     |   2 +-
 ...e.java => LocalTablePartitionStateMessage.java} |  14 +-
 .../disaster/LocalTablePartitionStateRequest.java} |  41 +---
 ....java => LocalTablePartitionStateResponse.java} |  10 +-
 .../rest/recovery/DisasterRecoveryController.java  |  59 ------
 .../disaster/DisasterRecoveryManager.java          | 225 +++++++++++++++++----
 .../disaster/LocalPartitionStateMessageByNode.java |   4 +
 .../disaster/ItDisasterRecoveryManagerTest.java    |  82 ++++++++
 9 files changed, 308 insertions(+), 141 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 87373e7e836..09c2f55e802 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -35,6 +35,9 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.WriteInte
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.message.HasDataRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
@@ -287,5 +290,14 @@ public interface PartitionReplicationMessageGroup {
 
         /** Message type for {@link LocalPartitionStatesResponse}. */
         short LOCAL_PARTITION_STATE_RESPONSE = 102;
+
+        /** Message type for {@link LocalTablePartitionStateMessage}. */
+        short LOCAL_TABLE_PARTITION_STATE = 103;
+
+        /** Message type for {@link LocalTablePartitionStateRequest}. */
+        short LOCAL_TABLE_PARTITION_STATE_REQUEST = 104;
+
+        /** Message type for {@link LocalTablePartitionStateResponse}. */
+        short LOCAL_TABLE_PARTITION_STATE_RESPONSE = 105;
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
index 2ac461e0dbc..9841c7428ef 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
@@ -23,7 +23,7 @@ import 
org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
 
 /**
- * Response for {@link LocalPartitionStatesResponse}.
+ * Response for {@link LocalPartitionStatesRequest}.
  */
 @Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
 public interface LocalPartitionStatesResponse extends NetworkMessage {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateMessage.java
similarity index 69%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateMessage.java
index 2ac461e0dbc..d0478b5b933 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateMessage.java
@@ -17,15 +17,19 @@
 
 package org.apache.ignite.internal.partition.replicator.network.disaster;
 
-import java.util.List;
+import java.util.Map;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 
 /**
- * Response for {@link LocalPartitionStatesResponse}.
+ * A message for reading estimated number of rows for tables stored on this 
node.
  */
-@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
-public interface LocalPartitionStatesResponse extends NetworkMessage {
-    List<LocalPartitionStateMessage> states();
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_PARTITION_STATE)
+public interface LocalTablePartitionStateMessage extends NetworkMessage {
+    /**
+     * Returns estimated number of rows for tables stored on this node.
+     */
+    Map<TablePartitionIdMessage, Long> tablePartitionIdToEstimatedRowsMap();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/TableState.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateRequest.java
similarity index 52%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/TableState.java
rename to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateRequest.java
index e3d84ab12df..63254c42172 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/TableState.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateRequest.java
@@ -15,39 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.partition.replicator.network.disaster;
 
-import org.apache.ignite.internal.tostring.S;
+import java.util.Set;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
 
 /**
- * Table state.
+ * Request for reading table states from the node.
  */
-public class TableState {
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_PARTITION_STATE_REQUEST)
+public interface LocalTablePartitionStateRequest extends NetworkMessage {
+    Set<ZonePartitionIdMessage> zonePartitionIds();
 
-    private final int tableId;
-    private final String schemaName;
-    private final String tableName;
-
-    TableState(int tableId, String tableName, String schemaName) {
-        this.tableId = tableId;
-        this.schemaName = schemaName;
-        this.tableName = tableName;
-    }
-
-    public int tableId() {
-        return tableId;
-    }
-
-    public String schemaName() {
-        return schemaName;
-    }
-
-    public String tableName() {
-        return tableName;
-    }
-
-    @Override
-    public String toString() {
-        return S.toString(TableState.class, this);
-    }
+    int catalogVersion();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateResponse.java
similarity index 80%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateResponse.java
index 2ac461e0dbc..4ca7e0b1a26 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateResponse.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.partition.replicator.network.disaster;
 
-import java.util.List;
+import java.util.Set;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
 
 /**
- * Response for {@link LocalPartitionStatesResponse}.
+ * Response for {@link LocalTablePartitionStateRequest}.
  */
-@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
-public interface LocalPartitionStatesResponse extends NetworkMessage {
-    List<LocalPartitionStateMessage> states();
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_PARTITION_STATE_RESPONSE)
+public interface LocalTablePartitionStateResponse extends NetworkMessage {
+    Set<LocalTablePartitionStateMessage> states();
 }
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index 99a0988d3a2..bae1f0741c1 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -55,7 +55,6 @@ import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState
 import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
 import 
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartitionState;
 import 
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartitionStateByNode;
-import org.apache.ignite.internal.table.distributed.disaster.TableState;
 import org.apache.ignite.table.QualifiedName;
 
 /**
@@ -78,13 +77,6 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
             Optional<Set<String>> nodeNames,
             Optional<Set<Integer>> partitionIds
     ) {
-        if (nodeProperties.colocationEnabled()) {
-            // The table response is actually a superset of the zone response, 
so should be fine to convert it.
-            CompletableFuture<LocalZonePartitionStatesResponse> zoneStates =
-                    getZoneLocalPartitionStates(zoneNames, nodeNames, 
partitionIds);
-            return zoneStates.thenApply(zoneResponse -> 
convertLocalZoneToTableStates(zoneResponse, disasterRecoveryManager));
-        }
-
         return disasterRecoveryManager.localTablePartitionStates(
                         zoneNames.orElse(Set.of()),
                         nodeNames.orElse(Set.of()),
@@ -98,14 +90,6 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
             Optional<Set<String>> zoneNames,
             Optional<Set<Integer>> partitionIds
     ) {
-        if (nodeProperties.colocationEnabled()) {
-            // The table response is actually a superset of the zone response, 
so should be fine to convert it.
-
-            CompletableFuture<GlobalZonePartitionStatesResponse> zoneStates =
-                    getZoneGlobalPartitionStates(zoneNames, partitionIds);
-            return zoneStates.thenApply(zoneResponse -> 
convertGlobalZoneToTableStates(zoneResponse, disasterRecoveryManager));
-        }
-
         return disasterRecoveryManager.globalTablePartitionStates(
                         zoneNames.orElse(Set.of()),
                         partitionIds.orElse(Set.of())
@@ -192,28 +176,6 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
         ).thenApply(DisasterRecoveryController::convertGlobalZoneStates);
     }
 
-    private static LocalPartitionStatesResponse convertLocalZoneToTableStates(
-            LocalZonePartitionStatesResponse zoneResponse,
-            DisasterRecoveryManager manager) {
-        List<LocalPartitionStateResponse> states = new ArrayList<>();
-
-        for (LocalZonePartitionStateResponse zoneState : 
zoneResponse.states()) {
-            for (TableState tableState : 
manager.zoneTablesStates(zoneState.zoneName())) {
-                states.add(new LocalPartitionStateResponse(
-                        zoneState.nodeName(),
-                        zoneState.zoneName(),
-                        tableState.schemaName(),
-                        tableState.tableId(),
-                        tableState.tableName(),
-                        zoneState.partitionId(),
-                        zoneState.state(),
-                        zoneState.estimatedRows()
-                ));
-            }
-        }
-        return createLocalPartitionStatesResponse(states);
-    }
-
     private static LocalPartitionStatesResponse 
convertLocalTableStates(Map<TablePartitionId, LocalTablePartitionStateByNode> 
localStates) {
         List<LocalPartitionStateResponse> states = new ArrayList<>();
 
@@ -290,27 +252,6 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
         return createGlobalPartitionStatesResponse(states);
     }
 
-    private static GlobalPartitionStatesResponse 
convertGlobalZoneToTableStates(
-            GlobalZonePartitionStatesResponse zoneResponse,
-            DisasterRecoveryManager manager
-    ) {
-        List<GlobalPartitionStateResponse> states = new ArrayList<>();
-
-        for (GlobalZonePartitionStateResponse zoneState : 
zoneResponse.states()) {
-            for (TableState tableState : 
manager.zoneTablesStates(zoneState.zoneName())) {
-                states.add(new GlobalPartitionStateResponse(
-                        zoneState.zoneName(),
-                        tableState.schemaName(),
-                        tableState.tableId(),
-                        tableState.tableName(),
-                        zoneState.partitionId(),
-                        zoneState.state()
-                ));
-            }
-        }
-        return createGlobalPartitionStatesResponse(states);
-    }
-
     private static GlobalPartitionStatesResponse 
createGlobalPartitionStatesResponse(List<GlobalPartitionStateResponse> states) {
         // Sort the output conveniently.
         states.sort(comparing(GlobalPartitionStateResponse::schemaName)
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index f049bb7829a..bc22e3da915 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.disaster;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.groupingBy;
@@ -102,6 +103,9 @@ import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.raft.Loza;
@@ -109,10 +113,13 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.systemview.api.SystemView;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.systemview.api.SystemViewProvider;
+import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
@@ -787,7 +794,9 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         catalog,
                         zoneState()
                 )
-                        .thenApply(res -> zoneStateToTableState(res, catalog))
+                        .thenCompose(res -> 
tableStateForZone(toZonesOnNodes(res), catalog.version())
+                                .thenApply(tableState -> 
zoneStateToTableState(res, tableState, catalog))
+                        )
                         .thenApply(res -> normalizeTableLocal(res, catalog));
             }
 
@@ -826,7 +835,9 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         catalog,
                         zoneState()
                 )
-                        .thenApply(res -> zoneStateToTableState(res, catalog))
+                        .thenCompose(res -> 
tableStateForZone(toZonesOnNodes(res), catalog.version())
+                                .thenApply(tableState -> 
zoneStateToTableState(res, tableState, catalog))
+                        )
                         .thenApply(res -> normalizeTableLocal(res, catalog))
                         .thenApply(res -> assembleTableGlobal(res, 
partitionIds, catalog));
             }
@@ -845,16 +856,108 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         }
     }
 
-    private Map<TablePartitionId, LocalPartitionStateMessageByNode> 
zoneStateToTableState(
+    /**
+     * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone 
names to the set of zone partitions.
+     *
+     * @param partitionStateMap Partition state map.
+     * @return Mapping of zone names to the set of zone partitions.
+     */
+    private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(
+            Map<ZonePartitionId, LocalPartitionStateMessageByNode> 
partitionStateMap
+    ) {
+        Map<String, Set<ZonePartitionId>> res = new HashMap<>();
+
+        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> 
entry : partitionStateMap.entrySet()) {
+            ZonePartitionId zonePartitionId = entry.getKey();
+
+            LocalPartitionStateMessageByNode 
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+            for (String nodeName : 
zoneLocalPartitionStateMessageByNode.nodes()) {
+                res.computeIfAbsent(nodeName, k -> new 
HashSet<>()).add(zonePartitionId);
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns estimated number of rows for each table having a partition in 
the specified zones.
+     *
+     * <p>The result is returned from the nodes specified in the {@code 
zonesOnNodes.keySet()} -
+     * these are the nodes we previously received partition states from.
+     *
+     * @param zonesOnNodes Mapping of node names to the set of zone partitions.
+     * @param catalogVersion Catalog version.
+     * @return Future with the mapping.
+     */
+    private CompletableFuture<Map<String, Map<TablePartitionIdMessage, Long>>> 
tableStateForZone(
+            Map<String, Set<ZonePartitionId>> zonesOnNodes,
+            int catalogVersion
+    ) {
+        Map<String, Map<TablePartitionIdMessage, Long>> result = new 
ConcurrentHashMap<>();
+
+        CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream()
+                .map(entry ->
+                        tableStateForZoneOnNode(catalogVersion, 
entry.getKey(), entry.getValue())
+                                .thenAccept(response ->
+                                        response.states().forEach(state -> {
+                                            
result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+                                                    
.putAll(state.tablePartitionIdToEstimatedRowsMap());
+                                        })
+                                )
+                ).toArray(CompletableFuture[]::new);
+
+        return allOf(futures).handle((unused, err) -> {
+            if (err != null) {
+                throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
+            }
+
+            return result;
+        });
+    }
+
+    /**
+     * Returns estimated number of rows for each table having a partition in 
the specified zones.
+     *
+     * @param catalogVersion Catalog version.
+     * @param node Node we get table partition states from.
+     * @param zones Set of zone partitions.
+     * @return Future with the mapping.
+     */
+    private CompletableFuture<LocalTablePartitionStateResponse> 
tableStateForZoneOnNode(
+            int catalogVersion,
+            String node,
+            Set<ZonePartitionId> zones
+    ) {
+        Set<ZonePartitionIdMessage> zoneMessage = zones.stream()
+                .map(zonePartitionId -> 
toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, zonePartitionId))
+                .collect(toSet());
+        LocalTablePartitionStateRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateRequest()
+                .zonePartitionIds(zoneMessage)
+                .catalogVersion(catalogVersion)
+                .build();
+
+        return messagingService.invoke(node, request, 
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+                .thenApply(networkMessage -> {
+                    assert networkMessage instanceof 
LocalTablePartitionStateResponse : networkMessage;
+
+                    return (LocalTablePartitionStateResponse) networkMessage;
+                });
+    }
+
+    private static Map<TablePartitionId, LocalPartitionStateMessageByNode> 
zoneStateToTableState(
             Map<ZonePartitionId, LocalPartitionStateMessageByNode> 
partitionStateMap,
+            Map<String, Map<TablePartitionIdMessage, Long>> tableState,
             Catalog catalog
     ) {
         Map<TablePartitionId, LocalPartitionStateMessageByNode> res = new 
HashMap<>();
 
         for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> 
entry : partitionStateMap.entrySet()) {
-            int zoneId = entry.getKey().zoneId();
+            ZonePartitionId zonePartitionId = entry.getKey();
+
+            int zoneId = zonePartitionId.zoneId();
 
-            int partitionId = entry.getKey().partitionId();
+            int partitionId = zonePartitionId.partitionId();
 
             LocalPartitionStateMessageByNode 
zoneLocalPartitionStateMessageByNode = entry.getValue();
 
@@ -863,30 +966,27 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             for (CatalogTableDescriptor tableDescriptor : 
catalog.tables(zoneId)) {
                 TablePartitionId tablePartitionId = new 
TablePartitionId(tableDescriptor.id(), partitionId);
 
+                TablePartitionIdMessage tablePartitionIdMessage =
+                        toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
tablePartitionId);
+
                 for (Map.Entry<String, LocalPartitionStateMessage> nodeEntry : 
zoneLocalPartitionStateMessageByNode.entrySet()) {
                     String nodeName = nodeEntry.getKey();
 
-                    LocalPartitionStateMessage localPartitionStateMessage = 
nodeEntry.getValue();
-
-                    TableViewInternal tableViewInternal = 
tableManager.cachedTable(tablePartitionId.tableId());
+                    Long estimatedRows = tableState.getOrDefault(nodeName, 
emptyMap())
+                            .get(tablePartitionIdMessage);
 
-                    if (tableViewInternal == null) {
+                    if (estimatedRows == null) {
                         continue;
                     }
 
-                    MvPartitionStorage partitionStorage = 
tableViewInternal.internalTable().storage()
-                            .getMvPartition(tablePartitionId.partitionId());
-
-                    if (partitionStorage == null) {
-                        continue;
-                    }
+                    LocalPartitionStateMessage localPartitionStateMessage = 
nodeEntry.getValue();
 
                     LocalPartitionStateMessage tableLocalPartitionStateMessage 
=
                             
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage()
-                                    
.partitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
tablePartitionId))
+                                    .partitionId(tablePartitionIdMessage)
                                     .state(localPartitionStateMessage.state())
                                     
.logIndex(localPartitionStateMessage.logIndex())
-                                    
.estimatedRows(partitionStorage.estimatedSize())
+                                    .estimatedRows(estimatedRows)
                                     .build();
 
                     tableLocalPartitionStateMessageByNode.put(nodeName, 
tableLocalPartitionStateMessage);
@@ -1100,9 +1200,45 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
     private void handleMessage(NetworkMessage message, ClusterNode sender, 
@Nullable Long correlationId) {
         if (message instanceof LocalPartitionStatesRequest) {
             handleLocalPartitionStatesRequest((LocalPartitionStatesRequest) 
message, sender, correlationId);
+        } else if (message instanceof LocalTablePartitionStateRequest) {
+            handleLocalTableStateRequest((LocalTablePartitionStateRequest) 
message, sender, correlationId);
         }
     }
 
+    private void handleLocalTableStateRequest(LocalTablePartitionStateRequest 
request, ClusterNode sender, @Nullable Long correlationId) {
+        assert correlationId != null : "request=" + request + ", sender=" + 
sender;
+
+        int catalogVersion = request.catalogVersion();
+
+        Set<ZonePartitionId> requesedPartitions = 
request.zonePartitionIds().stream()
+                .map(ZonePartitionIdMessage::asZonePartitionId)
+                .collect(toSet());
+
+        catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
+            Set<LocalTablePartitionStateMessage> statesList = new HashSet<>();
+
+            raftManager.forEach((raftNodeId, raftGroupService) -> {
+                if (raftNodeId.groupId() instanceof ZonePartitionId) {
+
+                    LocalTablePartitionStateMessage message = 
handleSizeRequestForTablesInZone(
+                            requesedPartitions,
+                            (ZonePartitionId) raftNodeId.groupId()
+                    );
+
+                    if (message != null) {
+                        statesList.add(message);
+                    }
+                }
+            });
+
+            LocalTablePartitionStateResponse response = 
PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateResponse()
+                    .states(statesList)
+                    .build();
+
+            messagingService.respond(sender, response, correlationId);
+        }, threadPool);
+    }
+
     private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest 
request, ClusterNode sender, @Nullable Long correlationId) {
         assert correlationId != null : "request=" + request + ", sender=" + 
sender;
 
@@ -1145,6 +1281,19 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         }, threadPool);
     }
 
+    private @Nullable LocalTablePartitionStateMessage 
handleSizeRequestForTablesInZone(
+            Set<ZonePartitionId> requestedPartitions,
+            ZonePartitionId zonePartitionId
+    ) {
+        if (!containsOrEmpty(zonePartitionId, requestedPartitions)) {
+            return null;
+        }
+
+        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateMessage()
+                
.tablePartitionIdToEstimatedRowsMap(estimatedSizeMap(zonePartitionId))
+                .build();
+    }
+
     private @Nullable LocalPartitionStateMessage handleStateRequestForZone(
             LocalPartitionStatesRequest request,
             RaftGroupService raftGroupService,
@@ -1183,6 +1332,24 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                 .sum();
     }
 
+    private Map<TablePartitionIdMessage, Long> 
estimatedSizeMap(ZonePartitionId zonePartitionId) {
+        Map<TablePartitionIdMessage, Long> partitionIdToEstimatedRowsMap = new 
HashMap<>();
+
+        for (TableImpl tableImpl : 
tableManager.zoneTables(zonePartitionId.zoneId())) {
+            MvPartitionStorage mvPartitionStorage = 
tableImpl.internalTable().storage().getMvPartition(zonePartitionId.partitionId());
+
+            if (mvPartitionStorage != null) {
+                partitionIdToEstimatedRowsMap.put(
+                        toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
+                                new TablePartitionId(tableImpl.tableId(), 
zonePartitionId.partitionId())),
+                        mvPartitionStorage.estimatedSize()
+                );
+            }
+        }
+
+        return partitionIdToEstimatedRowsMap;
+    }
+
     private @Nullable LocalPartitionStateMessage handleStateRequestForTable(
             LocalPartitionStatesRequest request,
             RaftGroupService raftGroupService,
@@ -1571,30 +1738,6 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         return catalog;
     }
 
-    /**
-     * A helper method to get table states for the specified zone when 
colocation is enabled.
-     *
-     * @param zoneName Zone name.
-     * @return A collection of table states.
-     */
-    public Collection<TableState> zoneTablesStates(String zoneName) {
-        Catalog catalog = catalogLatestVersion();
-        int zoneId = zoneDescriptor(catalog, zoneName).id();
-
-        return tableManager.zoneTables(zoneId).stream()
-                .map(table -> {
-                    CatalogTableDescriptor tableDescriptor = 
catalog.table(table.tableId());
-                    String schemaName = 
catalog.schema(tableDescriptor.schemaId()).name();
-
-                    return new TableState(
-                            table.tableId(),
-                            table.qualifiedName().objectName(),
-                            schemaName
-                    );
-                })
-                .collect(toList());
-    }
-
     private static CatalogTableDescriptor tableDescriptor(Catalog catalog, 
String schemaName, String tableName) {
         CatalogTableDescriptor tableDescriptor = catalog.table(schemaName, 
tableName);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
index b1ba02cad36..e5598d70213 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
@@ -60,6 +60,10 @@ public class LocalPartitionStateMessageByNode {
         return map.get(node);
     }
 
+    public Set<String> nodes() {
+        return map.keySet();
+    }
+
     @Override
     public String toString() {
         return map.toString();
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 02eca4abbec..382c07b1ac0 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
@@ -43,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -268,6 +270,86 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         assertThat(selectAll(), hasSize(4));
     }
 
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "false")
+    @Test
+    @ZoneParams(nodes = 2, replicas = 1, partitions = 2)
+    void testEstimatedRowsTable() throws Exception {
+        validateEstimatedRows();
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @Test
+    @ZoneParams(nodes = 2, replicas = 1, partitions = 2)
+    void testEstimatedRowsTableZone() throws Exception {
+        validateEstimatedRows();
+    }
+
+    private void validateEstimatedRows() throws InterruptedException {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        insert(0, 0);
+        insert(1, 1);
+
+        // Wait for replication to finish.
+        assertTrue(waitForCondition(() -> {
+                    CompletableFuture<Map<TablePartitionId, 
LocalTablePartitionStateByNode>> localStateTableFuture =
+                            
node.disasterRecoveryManager().localTablePartitionStates(emptySet(), 
emptySet(), emptySet());
+
+                    assertThat(localStateTableFuture, 
willCompleteSuccessfully());
+                    Map<TablePartitionId, LocalTablePartitionStateByNode> 
localState;
+                    try {
+                        localState = localStateTableFuture.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    Set<Long> size = localState.values().stream()
+                            .flatMap(localTablePartitionStateByNode -> 
localTablePartitionStateByNode.values().stream())
+                            .map(state -> state.estimatedRows)
+                            .collect(Collectors.toSet());
+                    // There are 2 nodes, 2 partitions and 1 replica, so we 
should have 2 entries in localState (one for each partition),
+                    // LocalTablePartitionStateByNode should have a entry for 
either the first or the second node with 1 row.
+                    return size.size() == 1 && size.contains(1L) && 
localState.size() == 2;
+                },
+                20_000
+        ));
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @Test
+    @ZoneParams(nodes = 2, replicas = 1, partitions = 2)
+    void testEstimatedRowsZone() throws Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        insert(0, 0);
+        insert(1, 1);
+
+        // Wait for replication to finish.
+        assertTrue(waitForCondition(() -> {
+                    CompletableFuture<Map<ZonePartitionId, 
LocalPartitionStateByNode>> localStateTableFuture =
+                            
node.disasterRecoveryManager().localPartitionStates(Set.of(ZONE_NAME), 
emptySet(), emptySet());
+
+                    assertThat(localStateTableFuture, 
willCompleteSuccessfully());
+
+                    Map<ZonePartitionId, LocalPartitionStateByNode> localState;
+                    try {
+                        localState = localStateTableFuture.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    Set<Long> size = localState.values().stream()
+                            .flatMap(localTablePartitionStateByNode -> 
localTablePartitionStateByNode.values().stream())
+                            .map(state -> state.estimatedRows)
+                            .collect(Collectors.toSet());
+                    // There are 2 nodes, 2 partitions and 1 replica, so we 
should have 2 entries in localState (one for each partition),
+                    // LocalTablePartitionStateByNode should have a entry for 
either the first or the second node with 1 row.
+                    return size.size() == 1 && size.contains(1L) && 
localState.size() == 2;
+                },
+                20_000
+        ));
+    }
+
     @Test
     @ZoneParams(nodes = 2, replicas = 2, partitions = 2)
     void testLocalPartitionStateTable() throws Exception {


Reply via email to