This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ba41bed4926 IGNITE-25105 Support DisasterRecoveryManager functionality
in System views for the Colocation track (#5735)
ba41bed4926 is described below
commit ba41bed4926473a89937a7f77770ac8bbed800cb
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon May 12 18:49:20 2025 +0400
IGNITE-25105 Support DisasterRecoveryManager functionality in System views
for the Colocation track (#5735)
---
.../disaster/DisasterRecoveryManager.java | 97 ++++++++++++++++++-
.../disaster/DisasterRecoverySystemViews.java | 91 +++++++++++++++---
.../disaster/ItDisasterRecoveryManagerTest.java | 2 -
.../disaster/ItDisasterRecoverySystemViewTest.java | 43 ++++-----
...ecoveryZonePartitionsStatesSystemViewTest.java} | 105 +++++++++------------
5 files changed, 230 insertions(+), 108 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 5c9c5c3f0b2..345a4f9e312 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -36,8 +36,10 @@ import static
org.apache.ignite.internal.partition.replicator.network.disaster.L
import static
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
-import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalPartitionStatesSystemView;
-import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalPartitionStatesSystemView;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalTablePartitionStatesSystemView;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalZonePartitionStatesSystemView;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalTablePartitionStatesSystemView;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalZonePartitionStatesSystemView;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
@@ -272,9 +274,18 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
@Override
public List<SystemView<?>> systemViews() {
+ if (enabledColocation()) {
+ return List.of(
+ createGlobalZonePartitionStatesSystemView(this),
+ createLocalZonePartitionStatesSystemView(this),
+ createGlobalTablePartitionStatesSystemView(this),
+ createLocalTablePartitionStatesSystemView(this)
+ );
+ }
+
return List.of(
- createGlobalPartitionStatesSystemView(this),
- createLocalPartitionStatesSystemView(this)
+ createGlobalTablePartitionStatesSystemView(this),
+ createLocalTablePartitionStatesSystemView(this)
);
}
@@ -710,6 +721,18 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
try {
Catalog catalog = catalogLatestVersion();
+ if (enabledColocation()) {
+ return localPartitionStatesInternal(
+ zoneNames,
+ nodeNames,
+ partitionIds,
+ catalog,
+ zoneState()
+ )
+ .thenApply(res -> zoneStateToTableState(res, catalog))
+ .thenApply(res -> normalizeTableLocal(res, catalog));
+ }
+
return localPartitionStatesInternal(
zoneNames,
nodeNames,
@@ -737,6 +760,19 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
try {
Catalog catalog = catalogLatestVersion();
+ if (enabledColocation()) {
+ return localPartitionStatesInternal(
+ zoneNames,
+ Set.of(),
+ partitionIds,
+ catalog,
+ zoneState()
+ )
+ .thenApply(res -> zoneStateToTableState(res, catalog))
+ .thenApply(res -> normalizeTableLocal(res, catalog))
+ .thenApply(res -> assembleTableGlobal(res,
partitionIds, catalog));
+ }
+
return localPartitionStatesInternal(
zoneNames,
Set.of(),
@@ -751,6 +787,59 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}
}
+ private Map<TablePartitionId, LocalPartitionStateMessageByNode>
zoneStateToTableState(
+ Map<ZonePartitionId, LocalPartitionStateMessageByNode>
partitionStateMap,
+ Catalog catalog
+ ) {
+ Map<TablePartitionId, LocalPartitionStateMessageByNode> res = new
HashMap<>();
+
+ for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode>
entry : partitionStateMap.entrySet()) {
+ int zoneId = entry.getKey().zoneId();
+
+ int partitionId = entry.getKey().partitionId();
+
+ LocalPartitionStateMessageByNode
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+ LocalPartitionStateMessageByNode
tableLocalPartitionStateMessageByNode = new
LocalPartitionStateMessageByNode(new HashMap<>());
+
+ for (CatalogTableDescriptor tableDescriptor :
catalog.tables(zoneId)) {
+ TablePartitionId tablePartitionId = new
TablePartitionId(tableDescriptor.id(), partitionId);
+
+ for (Map.Entry<String, LocalPartitionStateMessage> nodeEntry :
zoneLocalPartitionStateMessageByNode.entrySet()) {
+ String nodeName = nodeEntry.getKey();
+
+ LocalPartitionStateMessage localPartitionStateMessage =
nodeEntry.getValue();
+
+ TableViewInternal tableViewInternal =
tableManager.cachedTable(tablePartitionId.tableId());
+
+ if (tableViewInternal == null) {
+ continue;
+ }
+
+ MvPartitionStorage partitionStorage =
tableViewInternal.internalTable().storage()
+ .getMvPartition(tablePartitionId.partitionId());
+
+ if (partitionStorage == null) {
+ continue;
+ }
+
+ LocalPartitionStateMessage tableLocalPartitionStateMessage
=
+
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage()
+
.partitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
tablePartitionId))
+ .state(localPartitionStateMessage.state())
+
.logIndex(localPartitionStateMessage.logIndex())
+
.estimatedRows(partitionStorage.estimatedSize())
+ .build();
+
+ tableLocalPartitionStateMessageByNode.put(nodeName,
tableLocalPartitionStateMessage);
+ }
+ res.put(tablePartitionId,
tableLocalPartitionStateMessageByNode);
+ }
+ }
+
+ return res;
+ }
+
static Function<LocalPartitionStateMessage, TablePartitionId> tableState()
{
return state -> state.partitionId().asTablePartitionId();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
index d3148a07106..47c0a53131a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
@@ -35,15 +35,23 @@ import
org.apache.ignite.internal.systemview.api.SystemViews;
/** Helper class for disaster recovery system views. */
class DisasterRecoverySystemViews {
- private static final Comparator<GlobalTablePartitionState>
GLOBAL_PARTITION_STATE_COMPARATOR =
+ private static final Comparator<GlobalTablePartitionState>
GLOBAL_TABLE_PARTITION_STATE_COMPARATOR =
comparing((GlobalTablePartitionState state) ->
state.tableName).thenComparingInt(state -> state.partitionId);
- private static final Comparator<SystemViewLocalPartitionState>
SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR =
- comparing((SystemViewLocalPartitionState state) ->
state.state.tableName)
+ private static final Comparator<SystemViewLocalTablePartitionState>
SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR =
+ comparing((SystemViewLocalTablePartitionState state) ->
state.state.tableName)
.thenComparingInt(state -> state.state.partitionId)
.thenComparing(state -> state.nodeName);
- static SystemView<?>
createGlobalPartitionStatesSystemView(DisasterRecoveryManager manager) {
+ private static final Comparator<GlobalPartitionState>
GLOBAL_ZONE_PARTITION_STATE_COMPARATOR =
+ comparing((GlobalPartitionState state) ->
state.zoneName).thenComparingInt(state -> state.partitionId);
+
+ private static final Comparator<SystemViewLocalZonePartitionState>
SYSTEM_VIEW_LOCAL_ZONE_PARTITION_STATE_COMPARATOR =
+ comparing((SystemViewLocalZonePartitionState state) ->
state.state.zoneName)
+ .thenComparingInt(state -> state.state.partitionId)
+ .thenComparing(state -> state.nodeName);
+
+ static SystemView<?>
createGlobalTablePartitionStatesSystemView(DisasterRecoveryManager manager) {
return SystemViews.<GlobalTablePartitionState>clusterViewBuilder()
.name("GLOBAL_PARTITION_STATES")
.addColumn("ZONE_NAME", STRING, state -> state.zoneName)
@@ -58,12 +66,12 @@ class DisasterRecoverySystemViews {
// They are kept for compatibility with 3.0 version, to allow
columns being found by their old names.
.addColumn("STATE", STRING, state -> state.state.name())
// End of legacy columns list. New columns must be added below
this line.
- .dataProvider(systemViewPublisher(() ->
globalPartitionStatesAsync(manager)))
+ .dataProvider(systemViewPublisher(() ->
globalTablePartitionStatesAsync(manager)))
.build();
}
- static SystemView<?>
createLocalPartitionStatesSystemView(DisasterRecoveryManager manager) {
- return SystemViews.<SystemViewLocalPartitionState>clusterViewBuilder()
+ static SystemView<?>
createLocalTablePartitionStatesSystemView(DisasterRecoveryManager manager) {
+ return
SystemViews.<SystemViewLocalTablePartitionState>clusterViewBuilder()
.name("LOCAL_PARTITION_STATES")
.addColumn("NODE_NAME", STRING, state -> state.nodeName)
.addColumn("ZONE_NAME", STRING, state -> state.state.zoneName)
@@ -79,7 +87,31 @@ class DisasterRecoverySystemViews {
// They are kept for compatibility with 3.0 version, to allow
columns being found by their old names.
.addColumn("STATE", STRING, state -> state.state.state.name())
// End of legacy columns list. New columns must be added below
this line.
- .dataProvider(systemViewPublisher(() ->
localPartitionStatesAsync(manager)))
+ .dataProvider(systemViewPublisher(() ->
localTablePartitionStatesAsync(manager)))
+ .build();
+ }
+
+ static SystemView<?>
createGlobalZonePartitionStatesSystemView(DisasterRecoveryManager manager) {
+ return SystemViews.<GlobalPartitionState>clusterViewBuilder()
+ .name("GLOBAL_ZONE_PARTITION_STATES")
+ .addColumn("ZONE_NAME", STRING, state -> state.zoneName)
+ .addColumn("PARTITION_ID", INT32, state -> state.partitionId)
+ .addColumn("PARTITION_STATE", STRING, state ->
state.state.name())
+ .addColumn("ZONE_ID", INT32, state -> state.zoneId)
+ .dataProvider(systemViewPublisher(() ->
globalZonePartitionStatesAsync(manager)))
+ .build();
+ }
+
+ static SystemView<?>
createLocalZonePartitionStatesSystemView(DisasterRecoveryManager manager) {
+ return
SystemViews.<SystemViewLocalZonePartitionState>clusterViewBuilder()
+ .name("LOCAL_ZONE_PARTITION_STATES")
+ .addColumn("NODE_NAME", STRING, state -> state.nodeName)
+ .addColumn("ZONE_NAME", STRING, state -> state.state.zoneName)
+ .addColumn("PARTITION_ID", INT32, state ->
state.state.partitionId)
+ .addColumn("PARTITION_STATE", STRING, state ->
state.state.state.name())
+ .addColumn("ESTIMATED_ROWS", INT64, state ->
state.state.estimatedRows)
+ .addColumn("ZONE_ID", INT32, state -> state.state.zoneId)
+ .dataProvider(systemViewPublisher(() ->
localZonePartitionStatesAsync(manager)))
.build();
}
@@ -91,28 +123,59 @@ class DisasterRecoverySystemViews {
};
}
- private static CompletableFuture<Iterator<GlobalTablePartitionState>>
globalPartitionStatesAsync(DisasterRecoveryManager manager) {
+ private static CompletableFuture<Iterator<GlobalTablePartitionState>>
globalTablePartitionStatesAsync(DisasterRecoveryManager manager) {
return manager.globalTablePartitionStates(Set.of(),
Set.of()).thenApply(states -> states.values().stream()
- .sorted(GLOBAL_PARTITION_STATE_COMPARATOR)
+ .sorted(GLOBAL_TABLE_PARTITION_STATE_COMPARATOR)
.iterator()
);
}
- private static CompletableFuture<Iterator<SystemViewLocalPartitionState>>
localPartitionStatesAsync(DisasterRecoveryManager manager) {
+ private static
CompletableFuture<Iterator<SystemViewLocalTablePartitionState>>
localTablePartitionStatesAsync(
+ DisasterRecoveryManager manager
+ ) {
return manager.localTablePartitionStates(Set.of(), Set.of(),
Set.of()).thenApply(states -> states.values().stream()
.flatMap(statesByNodeName ->
statesByNodeName.entrySet().stream())
- .map(nodeStates -> new
SystemViewLocalPartitionState(nodeStates.getKey(), nodeStates.getValue()))
+ .map(nodeStates -> new
SystemViewLocalTablePartitionState(nodeStates.getKey(), nodeStates.getValue()))
.sorted(SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR)
.iterator()
);
}
- private static class SystemViewLocalPartitionState {
+ private static CompletableFuture<Iterator<GlobalPartitionState>>
globalZonePartitionStatesAsync(DisasterRecoveryManager manager) {
+ return manager.globalPartitionStates(Set.of(),
Set.of()).thenApply(states -> states.values().stream()
+ .sorted(GLOBAL_ZONE_PARTITION_STATE_COMPARATOR)
+ .iterator()
+ );
+ }
+
+ private static
CompletableFuture<Iterator<SystemViewLocalZonePartitionState>>
localZonePartitionStatesAsync(
+ DisasterRecoveryManager manager
+ ) {
+ return manager.localPartitionStates(Set.of(), Set.of(),
Set.of()).thenApply(states -> states.values().stream()
+ .flatMap(statesByNodeName ->
statesByNodeName.entrySet().stream())
+ .map(nodeStates -> new
SystemViewLocalZonePartitionState(nodeStates.getKey(), nodeStates.getValue()))
+ .sorted(SYSTEM_VIEW_LOCAL_ZONE_PARTITION_STATE_COMPARATOR)
+ .iterator()
+ );
+ }
+
+ private static class SystemViewLocalTablePartitionState {
private final String nodeName;
private final LocalTablePartitionState state;
- private SystemViewLocalPartitionState(String nodeName,
LocalTablePartitionState state) {
+ private SystemViewLocalTablePartitionState(String nodeName,
LocalTablePartitionState state) {
+ this.nodeName = nodeName;
+ this.state = state;
+ }
+ }
+
+ private static class SystemViewLocalZonePartitionState {
+ private final String nodeName;
+
+ private final LocalPartitionState state;
+
+ private SystemViewLocalZonePartitionState(String nodeName,
LocalPartitionState state) {
this.nodeName = nodeName;
this.state = state;
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index db6c33d1488..109151400c8 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -157,7 +157,6 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
}
@Test
- @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
@ZoneParams(nodes = 2, replicas = 2, partitions = 2)
void testLocalPartitionStateTable() throws Exception {
IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
@@ -236,7 +235,6 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
}
@Test
- @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
@ZoneParams(nodes = 2, replicas = 2, partitions = 2)
void testGlobalPartitionStateTable() throws Exception {
IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
index 2266276707c..8e3cec3ac82 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.disaster;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static
org.apache.ignite.internal.disaster.ItDisasterRecoveryZonePartitionsStatesSystemViewTest.estimatedSize;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
import static
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
@@ -31,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
-import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -40,17 +39,13 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.sql.ColumnType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
/** For integration testing of disaster recovery system views. */
-// TODO https://issues.apache.org/jira/browse/IGNITE-25105
-@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest {
/** Table name. */
public static final String TABLE_NAME = "TEST_TABLE";
@@ -118,7 +113,11 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
- waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ if (enabledColocation()) {
+
ItDisasterRecoveryZonePartitionsStatesSystemViewTest.waitLeaderOnAllPartitions(ZONE_NAME,
partitionsCount);
+ } else {
+ waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ }
int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
@@ -136,7 +135,11 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
- waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ if (enabledColocation()) {
+
ItDisasterRecoveryZonePartitionsStatesSystemViewTest.waitLeaderOnAllPartitions(ZONE_NAME,
partitionsCount);
+ } else {
+ waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ }
List<String> nodeNames =
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
@@ -161,7 +164,11 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
- waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ if (enabledColocation()) {
+
ItDisasterRecoveryZonePartitionsStatesSystemViewTest.waitLeaderOnAllPartitions(ZONE_NAME,
partitionsCount);
+ } else {
+ waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ }
insertPeople(
TABLE_NAME,
@@ -176,7 +183,7 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
// Small wait is specially added so that the follower can execute the
replicated "insert" command and the counter is honestly
// increased.
assertTrue(waitForCondition(
- () -> nodeNames.stream().allMatch(nodeName ->
estimatedSize(nodeName, TABLE_NAME, 0) >= 2L),
+ () -> nodeNames.stream().allMatch(nodeName ->
estimatedSize(nodeName, TABLE_NAME, 0, CLUSTER) >= 2L),
10,
1_000
));
@@ -226,18 +233,4 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
return
catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName,
tableName).id();
}
-
- private static long estimatedSize(String nodeName, String tableName, int
partitionId) {
- return CLUSTER.runningNodes()
- .filter(ignite -> nodeName.equals(ignite.name()))
- .map(ignite -> {
- TableImpl table =
unwrapTableImpl(ignite.tables().table(tableName));
-
- return
table.internalTable().storage().getMvPartition(partitionId);
- })
- .filter(Objects::nonNull)
- .map(MvPartitionStorage::estimatedSize)
- .findAny()
- .orElse(-1L);
- }
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryZonePartitionsStatesSystemViewTest.java
similarity index 61%
copy from
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
copy to
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryZonePartitionsStatesSystemViewTest.java
index 2266276707c..83415a0f985 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryZonePartitionsStatesSystemViewTest.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
import static
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
-import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -34,24 +33,23 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.sql.ColumnType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
/** For integration testing of disaster recovery system views. */
-// TODO https://issues.apache.org/jira/browse/IGNITE-25105
-@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
-public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest {
+@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
+public class ItDisasterRecoveryZonePartitionsStatesSystemViewTest extends
BaseSqlIntegrationTest {
/** Table name. */
public static final String TABLE_NAME = "TEST_TABLE";
@@ -70,61 +68,43 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
@Test
public void globalPatitionStatesMetadata() {
- assertQuery("SELECT * FROM SYSTEM.GLOBAL_PARTITION_STATES")
+ assertQuery("SELECT * FROM SYSTEM.GLOBAL_ZONE_PARTITION_STATES")
.columnMetadata(
new
MetadataMatcher().name("ZONE_NAME").type(ColumnType.STRING).nullable(true),
- new
MetadataMatcher().name("TABLE_ID").type(ColumnType.INT32).nullable(true),
- new
MetadataMatcher().name("SCHEMA_NAME").type(ColumnType.STRING).nullable(true),
- new
MetadataMatcher().name("TABLE_NAME").type(ColumnType.STRING).nullable(true),
new
MetadataMatcher().name("PARTITION_ID").type(ColumnType.INT32).nullable(true),
new
MetadataMatcher().name("PARTITION_STATE").type(ColumnType.STRING).nullable(true),
- new
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true),
- new
MetadataMatcher().name("SCHEMA_ID").type(ColumnType.INT32).nullable(true),
- // Legacy columns.
- new
MetadataMatcher().name("STATE").type(ColumnType.STRING).nullable(true)
+ new
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true)
)
.check();
}
@Test
public void localPatitionStatesMetadata() {
- assertQuery("SELECT * FROM SYSTEM.LOCAL_PARTITION_STATES")
+ assertQuery("SELECT * FROM SYSTEM.LOCAL_ZONE_PARTITION_STATES")
.columnMetadata(
new
MetadataMatcher().name("NODE_NAME").type(ColumnType.STRING).nullable(true),
new
MetadataMatcher().name("ZONE_NAME").type(ColumnType.STRING).nullable(true),
- new
MetadataMatcher().name("TABLE_ID").type(ColumnType.INT32).nullable(true),
- new
MetadataMatcher().name("SCHEMA_NAME").type(ColumnType.STRING).nullable(true),
- new
MetadataMatcher().name("TABLE_NAME").type(ColumnType.STRING).nullable(true),
new
MetadataMatcher().name("PARTITION_ID").type(ColumnType.INT32).nullable(true),
new
MetadataMatcher().name("PARTITION_STATE").type(ColumnType.STRING).nullable(true),
new
MetadataMatcher().name("ESTIMATED_ROWS").type(ColumnType.INT64).nullable(true),
- new
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true),
- new
MetadataMatcher().name("SCHEMA_ID").type(ColumnType.INT32).nullable(true),
- // Legacy columns.
- new
MetadataMatcher().name("STATE").type(ColumnType.STRING).nullable(true)
+ new
MetadataMatcher().name("ZONE_ID").type(ColumnType.INT32).nullable(true)
)
.check();
}
- @Test
- void testNoZonesAndTables() {
-
assertQuery(globalPartitionStatesSystemViewSql()).returnNothing().check();
-
assertQuery(localPartitionStatesSystemViewSql()).returnNothing().check();
- }
-
@Test
void testGlobalPartitionStatesSystemView() {
int partitionsCount = 2;
createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
- waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ waitLeaderOnAllPartitions(ZONE_NAME, partitionsCount);
- int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
+ int zoneId = getZoneId(ZONE_NAME);
- assertQuery(globalPartitionStatesSystemViewSql())
- .returns(ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, TABLE_NAME,
0, AVAILABLE.name())
- .returns(ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME, TABLE_NAME,
1, AVAILABLE.name())
+ assertQuery(globalZonePartitionStatesSystemViewSql())
+ .returns(ZONE_NAME, 0, AVAILABLE.name(), zoneId)
+ .returns(ZONE_NAME, 1, AVAILABLE.name(), zoneId)
.check();
}
@@ -136,20 +116,20 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
- waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ waitLeaderOnAllPartitions(ZONE_NAME, partitionsCount);
List<String> nodeNames =
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
String nodeName0 = nodeNames.get(0);
String nodeName1 = nodeNames.get(1);
- int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
+ int zoneId = getZoneId(ZONE_NAME);
- assertQuery(localPartitionStatesSystemViewSql())
- .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 0, HEALTHY.name(), 0L)
- .returns(nodeName0, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 1, HEALTHY.name(), 0L)
- .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 0, HEALTHY.name(), 0L)
- .returns(nodeName1, ZONE_NAME, tableId, DEFAULT_SCHEMA_NAME,
TABLE_NAME, 1, HEALTHY.name(), 0L)
+ assertQuery(localZonePartitionStatesSystemViewSql())
+ .returns(nodeName0, ZONE_NAME, 0, HEALTHY.name(), 0L, zoneId)
+ .returns(nodeName0, ZONE_NAME, 1, HEALTHY.name(), 0L, zoneId)
+ .returns(nodeName1, ZONE_NAME, 0, HEALTHY.name(), 0L, zoneId)
+ .returns(nodeName1, ZONE_NAME, 1, HEALTHY.name(), 0L, zoneId)
.check();
}
@@ -161,7 +141,7 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
- waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
+ waitLeaderOnAllPartitions(ZONE_NAME, partitionsCount);
insertPeople(
TABLE_NAME,
@@ -171,64 +151,63 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
List<String> nodeNames =
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
- int tableId = getTableId(DEFAULT_SCHEMA_NAME, TABLE_NAME);
-
// Small wait is specially added so that the follower can execute the
replicated "insert" command and the counter is honestly
// increased.
assertTrue(waitForCondition(
- () -> nodeNames.stream().allMatch(nodeName ->
estimatedSize(nodeName, TABLE_NAME, 0) >= 2L),
+ () -> nodeNames.stream().allMatch(nodeName ->
estimatedSize(nodeName, TABLE_NAME, 0, CLUSTER) >= 2L),
10,
1_000
));
- assertQuery(localPartitionStatesSystemViewSql())
- .returns(nodeNames.get(0), ZONE_NAME, tableId,
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
- .returns(nodeNames.get(1), ZONE_NAME, tableId,
DEFAULT_SCHEMA_NAME, TABLE_NAME, 0, HEALTHY.name(), 2L)
+ int zoneId = getZoneId(ZONE_NAME);
+
+ assertQuery(localZonePartitionStatesSystemViewSql())
+ .returns(nodeNames.get(0), ZONE_NAME, 0, HEALTHY.name(), 2L,
zoneId)
+ .returns(nodeNames.get(1), ZONE_NAME, 0, HEALTHY.name(), 2L,
zoneId)
.check();
}
/**
- * waiting a leader for all partitions because later we expect that
partitions will be in AVAILABLE state. Without it there won't be
+ * Waiting a leader for all partitions because later we expect that
partitions will be in AVAILABLE state. Without it there won't be
* log updating (see {@code LocalPartitionStateEnumWithLogIndex#of}) and
then in SYSTEM.*_PARTITION_STATES we will get UNAVAILABLE state
* instead of the desired one. That's why in {@link
#testGlobalPartitionStatesSystemView()} and
* {@link #testLocalPartitionStatesSystemView()} we must manually trigger
{@link RaftGroupService#refreshLeader()} that will lead
* partitions to the proper states.
*
- * @param tableName A table whose partitions will do a leader refresh.
+ * @param zoneName A zone whose partitions will do a leader refresh.
* @param partitionsCount Expected the table partitions count for
iterating over them.
*/
- private static void waitLeaderOnAllPartitions(String tableName, int
partitionsCount) {
+ static void waitLeaderOnAllPartitions(String zoneName, int
partitionsCount) {
IgniteImpl node = unwrapIgniteImpl(CLUSTER.node(0));
- TableImpl table = ((PublicApiThreadingTable)
node.tables().table(tableName)).unwrap(TableImpl.class);
-
- int tableId = table.tableId();
+ int zoneId = getZoneId(zoneName);
IntStream.range(0, partitionsCount).forEach(partId -> assertThat(
node.replicaManager()
- .replica(new TablePartitionId(tableId, partId))
+ .replica(new ZonePartitionId(zoneId, partId))
.thenCompose(replica ->
replica.raftClient().refreshLeader()),
willCompleteSuccessfully()
));
}
- private static String globalPartitionStatesSystemViewSql() {
- return "SELECT ZONE_NAME, TABLE_ID, SCHEMA_NAME, TABLE_NAME,
PARTITION_ID, STATE FROM SYSTEM.GLOBAL_PARTITION_STATES";
+ private static String globalZonePartitionStatesSystemViewSql() {
+ return "SELECT ZONE_NAME, PARTITION_ID, PARTITION_STATE, ZONE_ID FROM
SYSTEM.GLOBAL_ZONE_PARTITION_STATES "
+ + "WHERE ZONE_NAME != 'Default'";
}
- private static String localPartitionStatesSystemViewSql() {
- return "SELECT NODE_NAME, ZONE_NAME, TABLE_ID, SCHEMA_NAME,
TABLE_NAME, PARTITION_ID, STATE, ESTIMATED_ROWS"
- + " FROM SYSTEM.LOCAL_PARTITION_STATES";
+ private static String localZonePartitionStatesSystemViewSql() {
+ return "SELECT NODE_NAME, ZONE_NAME, PARTITION_ID, PARTITION_STATE,
ESTIMATED_ROWS, ZONE_ID"
+ + " FROM SYSTEM.LOCAL_ZONE_PARTITION_STATES WHERE ZONE_NAME !=
'Default'";
}
- private static int getTableId(String schemaName, String tableName) {
+ private static int getZoneId(String zoneName) {
CatalogManager catalogManager =
unwrapIgniteImpl(CLUSTER.aliveNode()).catalogManager();
- return
catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName,
tableName).id();
+ return
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
}
- private static long estimatedSize(String nodeName, String tableName, int
partitionId) {
- return CLUSTER.runningNodes()
+ static long estimatedSize(String nodeName, String tableName, int
partitionId, Cluster cluster) {
+ return cluster.runningNodes()
.filter(ignite -> nodeName.equals(ignite.name()))
.map(ignite -> {
TableImpl table =
unwrapTableImpl(ignite.tables().table(tableName));