This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ba41bed4926 IGNITE-25105 Support DisasterRecoveryManager functionality 
in System views for the Colocation track (#5735)
ba41bed4926 is described below

commit ba41bed4926473a89937a7f77770ac8bbed800cb
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon May 12 18:49:20 2025 +0400

    IGNITE-25105 Support DisasterRecoveryManager functionality in System views 
for the Colocation track (#5735)
---
 .../disaster/DisasterRecoveryManager.java          |  97 ++++++++++++++++++-
 .../disaster/DisasterRecoverySystemViews.java      |  91 +++++++++++++++---
 .../disaster/ItDisasterRecoveryManagerTest.java    |   2 -
 .../disaster/ItDisasterRecoverySystemViewTest.java |  43 ++++-----
 ...ecoveryZonePartitionsStatesSystemViewTest.java} | 105 +++++++++------------
 5 files changed, 230 insertions(+), 108 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 5c9c5c3f0b2..345a4f9e312 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -36,8 +36,10 @@ import static 
org.apache.ignite.internal.partition.replicator.network.disaster.L
 import static 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
-import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalPartitionStatesSystemView;
-import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalPartitionStatesSystemView;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalTablePartitionStatesSystemView;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalZonePartitionStatesSystemView;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalTablePartitionStatesSystemView;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalZonePartitionStatesSystemView;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
@@ -272,9 +274,18 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
     @Override
     public List<SystemView<?>> systemViews() {
+        if (enabledColocation()) {
+            return List.of(
+                    createGlobalZonePartitionStatesSystemView(this),
+                    createLocalZonePartitionStatesSystemView(this),
+                    createGlobalTablePartitionStatesSystemView(this),
+                    createLocalTablePartitionStatesSystemView(this)
+            );
+        }
+
         return List.of(
-                createGlobalPartitionStatesSystemView(this),
-                createLocalPartitionStatesSystemView(this)
+                createGlobalTablePartitionStatesSystemView(this),
+                createLocalTablePartitionStatesSystemView(this)
         );
     }
 
@@ -710,6 +721,18 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         try {
             Catalog catalog = catalogLatestVersion();
 
+            if (enabledColocation()) {
+                return localPartitionStatesInternal(
+                        zoneNames,
+                        nodeNames,
+                        partitionIds,
+                        catalog,
+                        zoneState()
+                )
+                        .thenApply(res -> zoneStateToTableState(res, catalog))
+                        .thenApply(res -> normalizeTableLocal(res, catalog));
+            }
+
             return localPartitionStatesInternal(
                     zoneNames,
                     nodeNames,
@@ -737,6 +760,19 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         try {
             Catalog catalog = catalogLatestVersion();
 
+            if (enabledColocation()) {
+                return localPartitionStatesInternal(
+                        zoneNames,
+                        Set.of(),
+                        partitionIds,
+                        catalog,
+                        zoneState()
+                )
+                        .thenApply(res -> zoneStateToTableState(res, catalog))
+                        .thenApply(res -> normalizeTableLocal(res, catalog))
+                        .thenApply(res -> assembleTableGlobal(res, 
partitionIds, catalog));
+            }
+
             return localPartitionStatesInternal(
                     zoneNames,
                     Set.of(),
@@ -751,6 +787,59 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         }
     }
 
+    private Map<TablePartitionId, LocalPartitionStateMessageByNode> 
zoneStateToTableState(
+            Map<ZonePartitionId, LocalPartitionStateMessageByNode> 
partitionStateMap,
+            Catalog catalog
+    ) {
+        Map<TablePartitionId, LocalPartitionStateMessageByNode> res = new 
HashMap<>();
+
+        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> 
entry : partitionStateMap.entrySet()) {
+            int zoneId = entry.getKey().zoneId();
+
+            int partitionId = entry.getKey().partitionId();
+
+            LocalPartitionStateMessageByNode 
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+            LocalPartitionStateMessageByNode 
tableLocalPartitionStateMessageByNode = new 
LocalPartitionStateMessageByNode(new HashMap<>());
+
+            for (CatalogTableDescriptor tableDescriptor : 
catalog.tables(zoneId)) {
+                TablePartitionId tablePartitionId = new 
TablePartitionId(tableDescriptor.id(), partitionId);
+
+                for (Map.Entry<String, LocalPartitionStateMessage> nodeEntry : 
zoneLocalPartitionStateMessageByNode.entrySet()) {
+                    String nodeName = nodeEntry.getKey();
+
+                    LocalPartitionStateMessage localPartitionStateMessage = 
nodeEntry.getValue();
+
+                    TableViewInternal tableViewInternal = 
tableManager.cachedTable(tablePartitionId.tableId());
+
+                    if (tableViewInternal == null) {
+                        continue;
+                    }
+
+                    MvPartitionStorage partitionStorage = 
tableViewInternal.internalTable().storage()
+                            .getMvPartition(tablePartitionId.partitionId());
+
+                    if (partitionStorage == null) {
+                        continue;
+                    }
+
+                    LocalPartitionStateMessage tableLocalPartitionStateMessage 
=
+                            
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage()
+                                    
.partitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
tablePartitionId))
+                                    .state(localPartitionStateMessage.state())
+                                    
.logIndex(localPartitionStateMessage.logIndex())
+                                    
.estimatedRows(partitionStorage.estimatedSize())
+                                    .build();
+
+                    tableLocalPartitionStateMessageByNode.put(nodeName, 
tableLocalPartitionStateMessage);
+                }
+                res.put(tablePartitionId, 
tableLocalPartitionStateMessageByNode);
+            }
+        }
+
+        return res;
+    }
+
     static Function<LocalPartitionStateMessage, TablePartitionId> tableState() 
{
         return state -> state.partitionId().asTablePartitionId();
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
index d3148a07106..47c0a53131a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
@@ -35,15 +35,23 @@ import 
org.apache.ignite.internal.systemview.api.SystemViews;
 
 /** Helper class for disaster recovery system views. */
 class DisasterRecoverySystemViews {
-    private static final Comparator<GlobalTablePartitionState> 
GLOBAL_PARTITION_STATE_COMPARATOR =
+    private static final Comparator<GlobalTablePartitionState> 
GLOBAL_TABLE_PARTITION_STATE_COMPARATOR =
             comparing((GlobalTablePartitionState state) -> 
state.tableName).thenComparingInt(state -> state.partitionId);
 
-    private static final Comparator<SystemViewLocalPartitionState> 
SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR =
-            comparing((SystemViewLocalPartitionState state) -> 
state.state.tableName)
+    private static final Comparator<SystemViewLocalTablePartitionState> 
SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR =
+            comparing((SystemViewLocalTablePartitionState state) -> 
state.state.tableName)
                     .thenComparingInt(state -> state.state.partitionId)
                     .thenComparing(state -> state.nodeName);
 
-    static SystemView<?> 
createGlobalPartitionStatesSystemView(DisasterRecoveryManager manager) {
+    private static final Comparator<GlobalPartitionState> 
GLOBAL_ZONE_PARTITION_STATE_COMPARATOR =
+            comparing((GlobalPartitionState state) -> 
state.zoneName).thenComparingInt(state -> state.partitionId);
+
+    private static final Comparator<SystemViewLocalZonePartitionState> 
SYSTEM_VIEW_LOCAL_ZONE_PARTITION_STATE_COMPARATOR =
+            comparing((SystemViewLocalZonePartitionState state) -> 
state.state.zoneName)
+                    .thenComparingInt(state -> state.state.partitionId)
+                    .thenComparing(state -> state.nodeName);
+
+    static SystemView<?> 
createGlobalTablePartitionStatesSystemView(DisasterRecoveryManager manager) {
         return SystemViews.<GlobalTablePartitionState>clusterViewBuilder()
                 .name("GLOBAL_PARTITION_STATES")
                 .addColumn("ZONE_NAME", STRING, state -> state.zoneName)
@@ -58,12 +66,12 @@ class DisasterRecoverySystemViews {
                 //  They are kept for compatibility with 3.0 version, to allow 
columns being found by their old names.
                 .addColumn("STATE", STRING, state -> state.state.name())
                 // End of legacy columns list. New columns must be added below 
this line.
-                .dataProvider(systemViewPublisher(() -> 
globalPartitionStatesAsync(manager)))
+                .dataProvider(systemViewPublisher(() -> 
globalTablePartitionStatesAsync(manager)))
                 .build();
     }
 
-    static SystemView<?> 
createLocalPartitionStatesSystemView(DisasterRecoveryManager manager) {
-        return SystemViews.<SystemViewLocalPartitionState>clusterViewBuilder()
+    static SystemView<?> 
createLocalTablePartitionStatesSystemView(DisasterRecoveryManager manager) {
+        return 
SystemViews.<SystemViewLocalTablePartitionState>clusterViewBuilder()
                 .name("LOCAL_PARTITION_STATES")
                 .addColumn("NODE_NAME", STRING, state -> state.nodeName)
                 .addColumn("ZONE_NAME", STRING, state -> state.state.zoneName)
@@ -79,7 +87,31 @@ class DisasterRecoverySystemViews {
                 //  They are kept for compatibility with 3.0 version, to allow 
columns being found by their old names.
                 .addColumn("STATE", STRING, state -> state.state.state.name())
                 // End of legacy columns list. New columns must be added below 
this line.
-                .dataProvider(systemViewPublisher(() -> 
localPartitionStatesAsync(manager)))
+                .dataProvider(systemViewPublisher(() -> 
localTablePartitionStatesAsync(manager)))
+                .build();
+    }
+
+    static SystemView<?> 
createGlobalZonePartitionStatesSystemView(DisasterRecoveryManager manager) {
+        return SystemViews.<GlobalPartitionState>clusterViewBuilder()
+                .name("GLOBAL_ZONE_PARTITION_STATES")
+                .addColumn("ZONE_NAME", STRING, state -> state.zoneName)
+                .addColumn("PARTITION_ID", INT32, state -> state.partitionId)
+                .addColumn("PARTITION_STATE", STRING, state -> 
state.state.name())
+                .addColumn("ZONE_ID", INT32, state -> state.zoneId)
+                .dataProvider(systemViewPublisher(() -> 
globalZonePartitionStatesAsync(manager)))
+                .build();
+    }
+
+    static SystemView<?> 
createLocalZonePartitionStatesSystemView(DisasterRecoveryManager manager) {
+        return 
SystemViews.<SystemViewLocalZonePartitionState>clusterViewBuilder()
+                .name("LOCAL_ZONE_PARTITION_STATES")
+                .addColumn("NODE_NAME", STRING, state -> state.nodeName)
+                .addColumn("ZONE_NAME", STRING, state -> state.state.zoneName)
+                .addColumn("PARTITION_ID", INT32, state -> 
state.state.partitionId)
+                .addColumn("PARTITION_STATE", STRING, state -> 
state.state.state.name())
+                .addColumn("ESTIMATED_ROWS", INT64, state -> 
state.state.estimatedRows)
+                .addColumn("ZONE_ID", INT32, state -> state.state.zoneId)
+                .dataProvider(systemViewPublisher(() -> 
localZonePartitionStatesAsync(manager)))
                 .build();
     }
 
@@ -91,28 +123,59 @@ class DisasterRecoverySystemViews {
         };
     }
 
-    private static CompletableFuture<Iterator<GlobalTablePartitionState>> 
globalPartitionStatesAsync(DisasterRecoveryManager manager) {
+    private static CompletableFuture<Iterator<GlobalTablePartitionState>> 
globalTablePartitionStatesAsync(DisasterRecoveryManager manager) {
         return manager.globalTablePartitionStates(Set.of(), 
Set.of()).thenApply(states -> states.values().stream()
-                .sorted(GLOBAL_PARTITION_STATE_COMPARATOR)
+                .sorted(GLOBAL_TABLE_PARTITION_STATE_COMPARATOR)
                 .iterator()
         );
     }
 
-    private static CompletableFuture<Iterator<SystemViewLocalPartitionState>> 
localPartitionStatesAsync(DisasterRecoveryManager manager) {
+    private static 
CompletableFuture<Iterator<SystemViewLocalTablePartitionState>> 
localTablePartitionStatesAsync(
+            DisasterRecoveryManager manager
+    ) {
         return manager.localTablePartitionStates(Set.of(), Set.of(), 
Set.of()).thenApply(states -> states.values().stream()
                 .flatMap(statesByNodeName -> 
statesByNodeName.entrySet().stream())
-                .map(nodeStates -> new 
SystemViewLocalPartitionState(nodeStates.getKey(), nodeStates.getValue()))
+                .map(nodeStates -> new 
SystemViewLocalTablePartitionState(nodeStates.getKey(), nodeStates.getValue()))
                 .sorted(SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR)
                 .iterator()
         );
     }
 
-    private static class SystemViewLocalPartitionState {
+    private static CompletableFuture<Iterator<GlobalPartitionState>> 
globalZonePartitionStatesAsync(DisasterRecoveryManager manager) {
+        return manager.globalPartitionStates(Set.of(), 
Set.of()).thenApply(states -> states.values().stream()
+                .sorted(GLOBAL_ZONE_PARTITION_STATE_COMPARATOR)
+                .iterator()
+        );
+    }
+
+    private static 
CompletableFuture<Iterator<SystemViewLocalZonePartitionState>> 
localZonePartitionStatesAsync(
+            DisasterRecoveryManager manager
+    ) {
+        return manager.localPartitionStates(Set.of(), Set.of(), 
Set.of()).thenApply(states -> states.values().stream()
+                .flatMap(statesByNodeName -> 
statesByNodeName.entrySet().stream())
+                .map(nodeStates -> new 
SystemViewLocalZonePartitionState(nodeStates.getKey(), nodeStates.getValue()))
+                .sorted(SYSTEM_VIEW_LOCAL_ZONE_PARTITION_STATE_COMPARATOR)
+                .iterator()
+        );
+    }
+
+    private static class SystemViewLocalTablePartitionState {
         private final String nodeName;
 
         private final LocalTablePartitionState state;
 
-        private SystemViewLocalPartitionState(String nodeName, 
LocalTablePartitionState state) {
+        private SystemViewLocalTablePartitionState(String nodeName, 
LocalTablePartitionState state) {
+            this.nodeName = nodeName;
+            this.state = state;
+        }
+    }
+
+    private static class SystemViewLocalZonePartitionState {
+        private final String nodeName;
+
+        private final LocalPartitionState state;
+
+        private SystemViewLocalZonePartitionState(String nodeName, 
LocalPartitionState state) {
             this.nodeName = nodeName;
             this.state = state;
         }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index db6c33d1488..109151400c8 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -157,7 +157,6 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
     }
 
     @Test
-    @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
     @ZoneParams(nodes = 2, replicas = 2, partitions = 2)
     void testLocalPartitionStateTable() throws Exception {
         IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
@@ -236,7 +235,6 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
     }
 
     @Test
-    @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
     @ZoneParams(nodes = 2, replicas = 2, partitions = 2)
     void testGlobalPartitionStateTable() throws Exception {
         IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
index 2266276707c..8e3cec3ac82 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.disaster;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
-import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static 
org.apache.ignite.internal.disaster.ItDisasterRecoveryZonePartitionsStatesSystemViewTest.estimatedSize;
+import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
 import static 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
@@ -31,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -40,17 +39,13 @@ import 
org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.sql.ColumnType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 /** For integration testing of disaster recovery system views. */
-// TODO https://issues.apache.org/jira/browse/IGNITE-25105
-@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
 public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest {
     /** Table name. */
     public static final String TABLE_NAME = "TEST_TABLE";
@@ -118,7 +113,11 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
 
-        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        if (enabledColocation()) {
+            
ItDisasterRecoveryZonePartitionsStatesSystemViewTest.waitLeaderOnAllPartitions(ZONE_NAME,
 partitionsCount);
+        } else {
+            waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        }
 
         int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
 
@@ -136,7 +135,11 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
 
-        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        if (enabledColocation()) {
+            
ItDisasterRecoveryZonePartitionsStatesSystemViewTest.waitLeaderOnAllPartitions(ZONE_NAME,
 partitionsCount);
+        } else {
+            waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        }
 
         List<String> nodeNames = 
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
 
@@ -161,7 +164,11 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
 
-        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        if (enabledColocation()) {
+            
ItDisasterRecoveryZonePartitionsStatesSystemViewTest.waitLeaderOnAllPartitions(ZONE_NAME,
 partitionsCount);
+        } else {
+            waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        }
 
         insertPeople(
                 TABLE_NAME,
@@ -176,7 +183,7 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
         // Small wait is specially added so that the follower can execute the 
replicated "insert" command and the counter is honestly
         // increased.
         assertTrue(waitForCondition(
-                () -> nodeNames.stream().allMatch(nodeName -> 
estimatedSize(nodeName, TABLE_NAME, 0) >= 2L),
+                () -> nodeNames.stream().allMatch(nodeName -> 
estimatedSize(nodeName, TABLE_NAME, 0, CLUSTER) >= 2L),
                 10,
                 1_000
         ));
@@ -226,18 +233,4 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         return 
catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName, 
tableName).id();
     }
-
-    private static long estimatedSize(String nodeName, String tableName, int 
partitionId) {
-        return CLUSTER.runningNodes()
-                .filter(ignite -> nodeName.equals(ignite.name()))
-                .map(ignite -> {
-                    TableImpl table = 
unwrapTableImpl(ignite.tables().table(tableName));
-
-                    return 
table.internalTable().storage().getMvPartition(partitionId);
-                })
-                .filter(Objects::nonNull)
-                .map(MvPartitionStorage::estimatedSize)
-                .findAny()
-                .orElse(-1L);
-    }
 }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryZonePartitionsStatesSystemViewTest.java
similarity index 61%
copy from 
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
copy to 
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryZonePartitionsStatesSystemViewTest.java
index 2266276707c..83415a0f985 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryZonePartitionsStatesSystemViewTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
 import static 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
-import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -34,24 +33,23 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.sql.ColumnType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 /** For integration testing of disaster recovery system views. */
-// TODO https://issues.apache.org/jira/browse/IGNITE-25105
-@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
-public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest {
+@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
+public class ItDisasterRecoveryZonePartitionsStatesSystemViewTest extends 
BaseSqlIntegrationTest {
     /** Table name. */
     public static final String TABLE_NAME = "TEST_TABLE";
 
@@ -70,61 +68,43 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
     @Test
     public void globalPatitionStatesMetadata() {
-        assertQuery("SELECT * FROM SYSTEM.GLOBAL_PARTITION_STATES")
+        assertQuery("SELECT * FROM SYSTEM.GLOBAL_ZONE_PARTITION_STATES")
                 .columnMetadata(
                         new 
MetadataMatcher().name("ZONE_NAME").type(ColumnType.STRING).nullable(true),
-                        new 
MetadataMatcher().name("TABLE_ID").type(ColumnType.INT32).nullable(true),
-                        new 
MetadataMatcher().name("SCHEMA_NAME").type(ColumnType.STRING).nullable(true),
-                        new 
MetadataMatcher().name("TABLE_NAME").type(ColumnType.STRING).nullable(true),
                         new 
MetadataMatcher().name("PARTITION_ID").type(ColumnType.INT32).nullable(true),
                         new 
MetadataMatcher().name("PARTITION_STATE").type(ColumnType.STRING).nullable(true),
-                        new 
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true),
-                        new 
MetadataMatcher().name("SCHEMA_ID").type(ColumnType.INT32).nullable(true),
-                        // Legacy columns.
-                        new 
MetadataMatcher().name("STATE").type(ColumnType.STRING).nullable(true)
+                        new 
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true)
                 )
                 .check();
     }
 
     @Test
     public void localPatitionStatesMetadata() {
-        assertQuery("SELECT * FROM SYSTEM.LOCAL_PARTITION_STATES")
+        assertQuery("SELECT * FROM SYSTEM.LOCAL_ZONE_PARTITION_STATES")
                 .columnMetadata(
                         new 
MetadataMatcher().name("NODE_NAME").type(ColumnType.STRING).nullable(true),
                         new 
MetadataMatcher().name("ZONE_NAME").type(ColumnType.STRING).nullable(true),
-                        new 
MetadataMatcher().name("TABLE_ID").type(ColumnType.INT32).nullable(true),
-                        new 
MetadataMatcher().name("SCHEMA_NAME").type(ColumnType.STRING).nullable(true),
-                        new 
MetadataMatcher().name("TABLE_NAME").type(ColumnType.STRING).nullable(true),
                         new 
MetadataMatcher().name("PARTITION_ID").type(ColumnType.INT32).nullable(true),
                         new 
MetadataMatcher().name("PARTITION_STATE").type(ColumnType.STRING).nullable(true),
                         new 
MetadataMatcher().name("ESTIMATED_ROWS").type(ColumnType.INT64).nullable(true),
-                        new 
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true),
-                        new 
MetadataMatcher().name("SCHEMA_ID").type(ColumnType.INT32).nullable(true),
-                        // Legacy columns.
-                        new 
MetadataMatcher().name("STATE").type(ColumnType.STRING).nullable(true)
+                        new 
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true)
                 )
                 .check();
     }
 
-    @Test
-    void testNoZonesAndTables() {
-        
assertQuery(globalPartitionStatesSystemViewSql()).returnNothing().check();
-        
assertQuery(localPartitionStatesSystemViewSql()).returnNothing().check();
-    }
-
     @Test
     void testGlobalPartitionStatesSystemView() {
         int partitionsCount = 2;
 
         createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
 
-        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        waitLeaderOnAllPartitions(ZONE_NAME, partitionsCount);
 
-        int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
+        int zoneId = getZoneId(ZONE_NAME);
 
-        assertQuery(globalPartitionStatesSystemViewSql())
-                .returns(ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, TABLE_NAME, 
0, AVAILABLE.name())
-                .returns(ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, TABLE_NAME, 
1, AVAILABLE.name())
+        assertQuery(globalZonePartitionStatesSystemViewSql())
+                .returns(ZONE_NAME, 0, AVAILABLE.name(), zoneId)
+                .returns(ZONE_NAME, 1, AVAILABLE.name(), zoneId)
                 .check();
     }
 
@@ -136,20 +116,20 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
 
-        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        waitLeaderOnAllPartitions(ZONE_NAME, partitionsCount);
 
         List<String> nodeNames = 
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
 
         String nodeName0 = nodeNames.get(0);
         String nodeName1 = nodeNames.get(1);
 
-        int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
+        int zoneId = getZoneId(ZONE_NAME);
 
-        assertQuery(localPartitionStatesSystemViewSql())
-                .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 0, HEALTHY.name(), 0L)
-                .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 1, HEALTHY.name(), 0L)
-                .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 0, HEALTHY.name(), 0L)
-                .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, 1, HEALTHY.name(), 0L)
+        assertQuery(localZonePartitionStatesSystemViewSql())
+                .returns(nodeName0, ZONE_NAME, 0, HEALTHY.name(), 0L, zoneId)
+                .returns(nodeName0, ZONE_NAME, 1, HEALTHY.name(), 0L, zoneId)
+                .returns(nodeName1, ZONE_NAME, 0, HEALTHY.name(), 0L, zoneId)
+                .returns(nodeName1, ZONE_NAME, 1, HEALTHY.name(), 0L, zoneId)
                 .check();
     }
 
@@ -161,7 +141,7 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 
partitionsCount);
 
-        waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+        waitLeaderOnAllPartitions(ZONE_NAME, partitionsCount);
 
         insertPeople(
                 TABLE_NAME,
@@ -171,64 +151,63 @@ public class ItDisasterRecoverySystemViewTest extends 
BaseSqlIntegrationTest {
 
         List<String> nodeNames = 
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
 
-        int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
-
         // Small wait is specially added so that the follower can execute the 
replicated "insert" command and the counter is honestly
         // increased.
         assertTrue(waitForCondition(
-                () -> nodeNames.stream().allMatch(nodeName -> 
estimatedSize(nodeName, TABLE_NAME, 0) >= 2L),
+                () -> nodeNames.stream().allMatch(nodeName -> 
estimatedSize(nodeName, TABLE_NAME, 0, CLUSTER) >= 2L),
                 10,
                 1_000
         ));
 
-        assertQuery(localPartitionStatesSystemViewSql())
-                .returns(nodeNames.get(0), ZONE_NAME, tableId, 
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
-                .returns(nodeNames.get(1), ZONE_NAME, tableId, 
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
+        int zoneId = getZoneId(ZONE_NAME);
+
+        assertQuery(localZonePartitionStatesSystemViewSql())
+                .returns(nodeNames.get(0), ZONE_NAME, 0, HEALTHY.name(), 2L, 
zoneId)
+                .returns(nodeNames.get(1), ZONE_NAME, 0, HEALTHY.name(), 2L, 
zoneId)
                 .check();
     }
 
     /**
-     * waiting a leader for all partitions because later we expect that 
partitions will be in AVAILABLE state. Without it there won't be
+     * Waiting a leader for all partitions because later we expect that 
partitions will be in AVAILABLE state. Without it there won't be
      * log updating (see {@code LocalPartitionStateEnumWithLogIndex#of}) and 
then in SYSTEM.*_PARTITION_STATES we will get UNAVAILABLE state
      * instead of the desired one. That's why in {@link 
#testGlobalPartitionStatesSystemView()} and
      * {@link #testLocalPartitionStatesSystemView()} we must manually trigger 
{@link RaftGroupService#refreshLeader()} that will lead
      * partitions to the proper states.
      *
-     * @param tableName A table whose partitions will do a leader refresh.
+     * @param zoneName A zone whose partitions will do a leader refresh.
      * @param partitionsCount Expected the table partitions count for 
iterating over them.
      */
-    private static void waitLeaderOnAllPartitions(String tableName, int 
partitionsCount) {
+    static void waitLeaderOnAllPartitions(String zoneName, int 
partitionsCount) {
         IgniteImpl node = unwrapIgniteImpl(CLUSTER.node(0));
 
-        TableImpl table = ((PublicApiThreadingTable) 
node.tables().table(tableName)).unwrap(TableImpl.class);
-
-        int tableId = table.tableId();
+        int zoneId = getZoneId(zoneName);
 
         IntStream.range(0, partitionsCount).forEach(partId -> assertThat(
                 node.replicaManager()
-                        .replica(new TablePartitionId(tableId, partId))
+                        .replica(new ZonePartitionId(zoneId, partId))
                         .thenCompose(replica -> 
replica.raftClient().refreshLeader()),
                 willCompleteSuccessfully()
         ));
     }
 
-    private static String globalPartitionStatesSystemViewSql() {
-        return "SELECT ZONE_NAME, TABLE_ID, SCHEMA_NAME, TABLE_NAME, 
PARTITION_ID, STATE FROM SYSTEM.GLOBAL_PARTITION_STATES";
+    private static String globalZonePartitionStatesSystemViewSql() {
+        return "SELECT ZONE_NAME, PARTITION_ID, PARTITION_STATE, ZONE_ID FROM 
SYSTEM.GLOBAL_ZONE_PARTITION_STATES "
+                + "WHERE ZONE_NAME != 'Default'";
     }
 
-    private static String localPartitionStatesSystemViewSql() {
-        return "SELECT NODE_NAME, ZONE_NAME, TABLE_ID, SCHEMA_NAME, 
TABLE_NAME, PARTITION_ID, STATE, ESTIMATED_ROWS"
-                + " FROM SYSTEM.LOCAL_PARTITION_STATES";
+    private static String localZonePartitionStatesSystemViewSql() {
+        return "SELECT NODE_NAME, ZONE_NAME, PARTITION_ID, PARTITION_STATE, 
ESTIMATED_ROWS, ZONE_ID"
+                + " FROM SYSTEM.LOCAL_ZONE_PARTITION_STATES WHERE ZONE_NAME != 
'Default'";
     }
 
-    private static int getTableId(String schemaName, String tableName) {
+    private static int getZoneId(String zoneName) {
         CatalogManager catalogManager = 
unwrapIgniteImpl(CLUSTER.aliveNode()).catalogManager();
 
-        return 
catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName, 
tableName).id();
+        return 
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
     }
 
-    private static long estimatedSize(String nodeName, String tableName, int 
partitionId) {
-        return CLUSTER.runningNodes()
+    static long estimatedSize(String nodeName, String tableName, int 
partitionId, Cluster cluster) {
+        return cluster.runningNodes()
                 .filter(ignite -> nodeName.equals(ignite.name()))
                 .map(ignite -> {
                     TableImpl table = 
unwrapTableImpl(ignite.tables().table(tableName));


Reply via email to