This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f5875a74777 IGNITE-25870 Unify behavior of REST and CLI (#6288)
f5875a74777 is described below
commit f5875a74777170007fa1c15a2320f9ab8223153f
Author: Cyrill <[email protected]>
AuthorDate: Wed Jul 23 14:28:25 2025 +0300
IGNITE-25870 Unify behavior of REST and CLI (#6288)
---
.../network/PartitionReplicationMessageGroup.java | 12 ++
.../disaster/LocalPartitionStatesResponse.java | 2 +-
...e.java => LocalTablePartitionStateMessage.java} | 14 +-
.../disaster/LocalTablePartitionStateRequest.java} | 41 +---
....java => LocalTablePartitionStateResponse.java} | 10 +-
.../rest/recovery/DisasterRecoveryController.java | 59 ------
.../disaster/DisasterRecoveryManager.java | 225 +++++++++++++++++----
.../disaster/LocalPartitionStateMessageByNode.java | 4 +
.../disaster/ItDisasterRecoveryManagerTest.java | 82 ++++++++
9 files changed, 308 insertions(+), 141 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 87373e7e836..09c2f55e802 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -35,6 +35,9 @@ import
org.apache.ignite.internal.partition.replicator.network.command.WriteInte
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
import
org.apache.ignite.internal.partition.replicator.network.message.HasDataRequest;
import
org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse;
import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
@@ -287,5 +290,14 @@ public interface PartitionReplicationMessageGroup {
/** Message type for {@link LocalPartitionStatesResponse}. */
short LOCAL_PARTITION_STATE_RESPONSE = 102;
+
+ /** Message type for {@link LocalTablePartitionStateMessage}. */
+ short LOCAL_TABLE_PARTITION_STATE = 103;
+
+ /** Message type for {@link LocalTablePartitionStateRequest}. */
+ short LOCAL_TABLE_PARTITION_STATE_REQUEST = 104;
+
+ /** Message type for {@link LocalTablePartitionStateResponse}. */
+ short LOCAL_TABLE_PARTITION_STATE_RESPONSE = 105;
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
index 2ac461e0dbc..9841c7428ef 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
@@ -23,7 +23,7 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
/**
- * Response for {@link LocalPartitionStatesResponse}.
+ * Response for {@link LocalPartitionStatesRequest}.
*/
@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
public interface LocalPartitionStatesResponse extends NetworkMessage {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateMessage.java
similarity index 69%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateMessage.java
index 2ac461e0dbc..d0478b5b933 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateMessage.java
@@ -17,15 +17,19 @@
package org.apache.ignite.internal.partition.replicator.network.disaster;
-import java.util.List;
+import java.util.Map;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
/**
- * Response for {@link LocalPartitionStatesResponse}.
+ * A message for reading estimated number of rows for tables stored on this
node.
*/
-@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
-public interface LocalPartitionStatesResponse extends NetworkMessage {
- List<LocalPartitionStateMessage> states();
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_PARTITION_STATE)
+public interface LocalTablePartitionStateMessage extends NetworkMessage {
+ /**
+ * Returns estimated number of rows for tables stored on this node.
+ */
+ Map<TablePartitionIdMessage, Long> tablePartitionIdToEstimatedRowsMap();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/TableState.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateRequest.java
similarity index 52%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/TableState.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateRequest.java
index e3d84ab12df..63254c42172 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/TableState.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateRequest.java
@@ -15,39 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.partition.replicator.network.disaster;
-import org.apache.ignite.internal.tostring.S;
+import java.util.Set;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
/**
- * Table state.
+ * Request for reading table states from the node.
*/
-public class TableState {
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_PARTITION_STATE_REQUEST)
+public interface LocalTablePartitionStateRequest extends NetworkMessage {
+ Set<ZonePartitionIdMessage> zonePartitionIds();
- private final int tableId;
- private final String schemaName;
- private final String tableName;
-
- TableState(int tableId, String tableName, String schemaName) {
- this.tableId = tableId;
- this.schemaName = schemaName;
- this.tableName = tableName;
- }
-
- public int tableId() {
- return tableId;
- }
-
- public String schemaName() {
- return schemaName;
- }
-
- public String tableName() {
- return tableName;
- }
-
- @Override
- public String toString() {
- return S.toString(TableState.class, this);
- }
+ int catalogVersion();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateResponse.java
similarity index 80%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateResponse.java
index 2ac461e0dbc..4ca7e0b1a26 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTablePartitionStateResponse.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.partition.replicator.network.disaster;
-import java.util.List;
+import java.util.Set;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
/**
- * Response for {@link LocalPartitionStatesResponse}.
+ * Response for {@link LocalTablePartitionStateRequest}.
*/
-@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_RESPONSE)
-public interface LocalPartitionStatesResponse extends NetworkMessage {
- List<LocalPartitionStateMessage> states();
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_PARTITION_STATE_RESPONSE)
+public interface LocalTablePartitionStateResponse extends NetworkMessage {
+ Set<LocalTablePartitionStateMessage> states();
}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index 99a0988d3a2..bae1f0741c1 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -55,7 +55,6 @@ import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState
import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
import
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartitionState;
import
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartitionStateByNode;
-import org.apache.ignite.internal.table.distributed.disaster.TableState;
import org.apache.ignite.table.QualifiedName;
/**
@@ -78,13 +77,6 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
Optional<Set<String>> nodeNames,
Optional<Set<Integer>> partitionIds
) {
- if (nodeProperties.colocationEnabled()) {
- // The table response is actually a superset of the zone response,
so should be fine to convert it.
- CompletableFuture<LocalZonePartitionStatesResponse> zoneStates =
- getZoneLocalPartitionStates(zoneNames, nodeNames,
partitionIds);
- return zoneStates.thenApply(zoneResponse ->
convertLocalZoneToTableStates(zoneResponse, disasterRecoveryManager));
- }
-
return disasterRecoveryManager.localTablePartitionStates(
zoneNames.orElse(Set.of()),
nodeNames.orElse(Set.of()),
@@ -98,14 +90,6 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
Optional<Set<String>> zoneNames,
Optional<Set<Integer>> partitionIds
) {
- if (nodeProperties.colocationEnabled()) {
- // The table response is actually a superset of the zone response,
so should be fine to convert it.
-
- CompletableFuture<GlobalZonePartitionStatesResponse> zoneStates =
- getZoneGlobalPartitionStates(zoneNames, partitionIds);
- return zoneStates.thenApply(zoneResponse ->
convertGlobalZoneToTableStates(zoneResponse, disasterRecoveryManager));
- }
-
return disasterRecoveryManager.globalTablePartitionStates(
zoneNames.orElse(Set.of()),
partitionIds.orElse(Set.of())
@@ -192,28 +176,6 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
).thenApply(DisasterRecoveryController::convertGlobalZoneStates);
}
- private static LocalPartitionStatesResponse convertLocalZoneToTableStates(
- LocalZonePartitionStatesResponse zoneResponse,
- DisasterRecoveryManager manager) {
- List<LocalPartitionStateResponse> states = new ArrayList<>();
-
- for (LocalZonePartitionStateResponse zoneState :
zoneResponse.states()) {
- for (TableState tableState :
manager.zoneTablesStates(zoneState.zoneName())) {
- states.add(new LocalPartitionStateResponse(
- zoneState.nodeName(),
- zoneState.zoneName(),
- tableState.schemaName(),
- tableState.tableId(),
- tableState.tableName(),
- zoneState.partitionId(),
- zoneState.state(),
- zoneState.estimatedRows()
- ));
- }
- }
- return createLocalPartitionStatesResponse(states);
- }
-
private static LocalPartitionStatesResponse
convertLocalTableStates(Map<TablePartitionId, LocalTablePartitionStateByNode>
localStates) {
List<LocalPartitionStateResponse> states = new ArrayList<>();
@@ -290,27 +252,6 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
return createGlobalPartitionStatesResponse(states);
}
- private static GlobalPartitionStatesResponse
convertGlobalZoneToTableStates(
- GlobalZonePartitionStatesResponse zoneResponse,
- DisasterRecoveryManager manager
- ) {
- List<GlobalPartitionStateResponse> states = new ArrayList<>();
-
- for (GlobalZonePartitionStateResponse zoneState :
zoneResponse.states()) {
- for (TableState tableState :
manager.zoneTablesStates(zoneState.zoneName())) {
- states.add(new GlobalPartitionStateResponse(
- zoneState.zoneName(),
- tableState.schemaName(),
- tableState.tableId(),
- tableState.tableName(),
- zoneState.partitionId(),
- zoneState.state()
- ));
- }
- }
- return createGlobalPartitionStatesResponse(states);
- }
-
private static GlobalPartitionStatesResponse
createGlobalPartitionStatesResponse(List<GlobalPartitionStateResponse> states) {
// Sort the output conveniently.
states.sort(comparing(GlobalPartitionStateResponse::schemaName)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index f049bb7829a..bc22e3da915 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.groupingBy;
@@ -102,6 +103,9 @@ import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.raft.Loza;
@@ -109,10 +113,13 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
+import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
@@ -787,7 +794,9 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
catalog,
zoneState()
)
- .thenApply(res -> zoneStateToTableState(res, catalog))
+ .thenCompose(res ->
tableStateForZone(toZonesOnNodes(res), catalog.version())
+ .thenApply(tableState ->
zoneStateToTableState(res, tableState, catalog))
+ )
.thenApply(res -> normalizeTableLocal(res, catalog));
}
@@ -826,7 +835,9 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
catalog,
zoneState()
)
- .thenApply(res -> zoneStateToTableState(res, catalog))
+ .thenCompose(res ->
tableStateForZone(toZonesOnNodes(res), catalog.version())
+ .thenApply(tableState ->
zoneStateToTableState(res, tableState, catalog))
+ )
.thenApply(res -> normalizeTableLocal(res, catalog))
.thenApply(res -> assembleTableGlobal(res,
partitionIds, catalog));
}
@@ -845,16 +856,108 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}
}
- private Map<TablePartitionId, LocalPartitionStateMessageByNode>
zoneStateToTableState(
+ /**
+ * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone
names to the set of zone partitions.
+ *
+ * @param partitionStateMap Partition state map.
+ * @return Mapping of zone names to the set of zone partitions.
+ */
+ private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(
+ Map<ZonePartitionId, LocalPartitionStateMessageByNode>
partitionStateMap
+ ) {
+ Map<String, Set<ZonePartitionId>> res = new HashMap<>();
+
+ for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode>
entry : partitionStateMap.entrySet()) {
+ ZonePartitionId zonePartitionId = entry.getKey();
+
+ LocalPartitionStateMessageByNode
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+ for (String nodeName :
zoneLocalPartitionStateMessageByNode.nodes()) {
+ res.computeIfAbsent(nodeName, k -> new
HashSet<>()).add(zonePartitionId);
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns estimated number of rows for each table having a partition in
the specified zones.
+ *
+ * <p>The result is returned from the nodes specified in the {@code
zonesOnNodes.keySet()} -
+ * these are the nodes we previously received partition states from.
+ *
+ * @param zonesOnNodes Mapping of node names to the set of zone partitions.
+ * @param catalogVersion Catalog version.
+ * @return Future with the mapping.
+ */
+ private CompletableFuture<Map<String, Map<TablePartitionIdMessage, Long>>>
tableStateForZone(
+ Map<String, Set<ZonePartitionId>> zonesOnNodes,
+ int catalogVersion
+ ) {
+ Map<String, Map<TablePartitionIdMessage, Long>> result = new
ConcurrentHashMap<>();
+
+ CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream()
+ .map(entry ->
+ tableStateForZoneOnNode(catalogVersion,
entry.getKey(), entry.getValue())
+ .thenAccept(response ->
+ response.states().forEach(state -> {
+
result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+
.putAll(state.tablePartitionIdToEstimatedRowsMap());
+ })
+ )
+ ).toArray(CompletableFuture[]::new);
+
+ return allOf(futures).handle((unused, err) -> {
+ if (err != null) {
+ throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
+ }
+
+ return result;
+ });
+ }
+
+ /**
+ * Returns estimated number of rows for each table having a partition in
the specified zones.
+ *
+ * @param catalogVersion Catalog version.
+ * @param node Node we get table partition states from.
+ * @param zones Set of zone partitions.
+ * @return Future with the mapping.
+ */
+ private CompletableFuture<LocalTablePartitionStateResponse>
tableStateForZoneOnNode(
+ int catalogVersion,
+ String node,
+ Set<ZonePartitionId> zones
+ ) {
+ Set<ZonePartitionIdMessage> zoneMessage = zones.stream()
+ .map(zonePartitionId ->
toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, zonePartitionId))
+ .collect(toSet());
+ LocalTablePartitionStateRequest request =
PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateRequest()
+ .zonePartitionIds(zoneMessage)
+ .catalogVersion(catalogVersion)
+ .build();
+
+ return messagingService.invoke(node, request,
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+ .thenApply(networkMessage -> {
+ assert networkMessage instanceof
LocalTablePartitionStateResponse : networkMessage;
+
+ return (LocalTablePartitionStateResponse) networkMessage;
+ });
+ }
+
+ private static Map<TablePartitionId, LocalPartitionStateMessageByNode>
zoneStateToTableState(
Map<ZonePartitionId, LocalPartitionStateMessageByNode>
partitionStateMap,
+ Map<String, Map<TablePartitionIdMessage, Long>> tableState,
Catalog catalog
) {
Map<TablePartitionId, LocalPartitionStateMessageByNode> res = new
HashMap<>();
for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode>
entry : partitionStateMap.entrySet()) {
- int zoneId = entry.getKey().zoneId();
+ ZonePartitionId zonePartitionId = entry.getKey();
+
+ int zoneId = zonePartitionId.zoneId();
- int partitionId = entry.getKey().partitionId();
+ int partitionId = zonePartitionId.partitionId();
LocalPartitionStateMessageByNode
zoneLocalPartitionStateMessageByNode = entry.getValue();
@@ -863,30 +966,27 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
for (CatalogTableDescriptor tableDescriptor :
catalog.tables(zoneId)) {
TablePartitionId tablePartitionId = new
TablePartitionId(tableDescriptor.id(), partitionId);
+ TablePartitionIdMessage tablePartitionIdMessage =
+ toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
tablePartitionId);
+
for (Map.Entry<String, LocalPartitionStateMessage> nodeEntry :
zoneLocalPartitionStateMessageByNode.entrySet()) {
String nodeName = nodeEntry.getKey();
- LocalPartitionStateMessage localPartitionStateMessage =
nodeEntry.getValue();
-
- TableViewInternal tableViewInternal =
tableManager.cachedTable(tablePartitionId.tableId());
+ Long estimatedRows = tableState.getOrDefault(nodeName,
emptyMap())
+ .get(tablePartitionIdMessage);
- if (tableViewInternal == null) {
+ if (estimatedRows == null) {
continue;
}
- MvPartitionStorage partitionStorage =
tableViewInternal.internalTable().storage()
- .getMvPartition(tablePartitionId.partitionId());
-
- if (partitionStorage == null) {
- continue;
- }
+ LocalPartitionStateMessage localPartitionStateMessage =
nodeEntry.getValue();
LocalPartitionStateMessage tableLocalPartitionStateMessage
=
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage()
-
.partitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
tablePartitionId))
+ .partitionId(tablePartitionIdMessage)
.state(localPartitionStateMessage.state())
.logIndex(localPartitionStateMessage.logIndex())
-
.estimatedRows(partitionStorage.estimatedSize())
+ .estimatedRows(estimatedRows)
.build();
tableLocalPartitionStateMessageByNode.put(nodeName,
tableLocalPartitionStateMessage);
@@ -1100,9 +1200,45 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
private void handleMessage(NetworkMessage message, ClusterNode sender,
@Nullable Long correlationId) {
if (message instanceof LocalPartitionStatesRequest) {
handleLocalPartitionStatesRequest((LocalPartitionStatesRequest)
message, sender, correlationId);
+ } else if (message instanceof LocalTablePartitionStateRequest) {
+ handleLocalTableStateRequest((LocalTablePartitionStateRequest)
message, sender, correlationId);
}
}
+ private void handleLocalTableStateRequest(LocalTablePartitionStateRequest
request, ClusterNode sender, @Nullable Long correlationId) {
+ assert correlationId != null : "request=" + request + ", sender=" +
sender;
+
+ int catalogVersion = request.catalogVersion();
+
+ Set<ZonePartitionId> requesedPartitions =
request.zonePartitionIds().stream()
+ .map(ZonePartitionIdMessage::asZonePartitionId)
+ .collect(toSet());
+
+ catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
+ Set<LocalTablePartitionStateMessage> statesList = new HashSet<>();
+
+ raftManager.forEach((raftNodeId, raftGroupService) -> {
+ if (raftNodeId.groupId() instanceof ZonePartitionId) {
+
+ LocalTablePartitionStateMessage message =
handleSizeRequestForTablesInZone(
+ requesedPartitions,
+ (ZonePartitionId) raftNodeId.groupId()
+ );
+
+ if (message != null) {
+ statesList.add(message);
+ }
+ }
+ });
+
+ LocalTablePartitionStateResponse response =
PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateResponse()
+ .states(statesList)
+ .build();
+
+ messagingService.respond(sender, response, correlationId);
+ }, threadPool);
+ }
+
private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest
request, ClusterNode sender, @Nullable Long correlationId) {
assert correlationId != null : "request=" + request + ", sender=" +
sender;
@@ -1145,6 +1281,19 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}, threadPool);
}
+ private @Nullable LocalTablePartitionStateMessage
handleSizeRequestForTablesInZone(
+ Set<ZonePartitionId> requestedPartitions,
+ ZonePartitionId zonePartitionId
+ ) {
+ if (!containsOrEmpty(zonePartitionId, requestedPartitions)) {
+ return null;
+ }
+
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateMessage()
+
.tablePartitionIdToEstimatedRowsMap(estimatedSizeMap(zonePartitionId))
+ .build();
+ }
+
private @Nullable LocalPartitionStateMessage handleStateRequestForZone(
LocalPartitionStatesRequest request,
RaftGroupService raftGroupService,
@@ -1183,6 +1332,24 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
.sum();
}
+ private Map<TablePartitionIdMessage, Long>
estimatedSizeMap(ZonePartitionId zonePartitionId) {
+ Map<TablePartitionIdMessage, Long> partitionIdToEstimatedRowsMap = new
HashMap<>();
+
+ for (TableImpl tableImpl :
tableManager.zoneTables(zonePartitionId.zoneId())) {
+ MvPartitionStorage mvPartitionStorage =
tableImpl.internalTable().storage().getMvPartition(zonePartitionId.partitionId());
+
+ if (mvPartitionStorage != null) {
+ partitionIdToEstimatedRowsMap.put(
+ toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
+ new TablePartitionId(tableImpl.tableId(),
zonePartitionId.partitionId())),
+ mvPartitionStorage.estimatedSize()
+ );
+ }
+ }
+
+ return partitionIdToEstimatedRowsMap;
+ }
+
private @Nullable LocalPartitionStateMessage handleStateRequestForTable(
LocalPartitionStatesRequest request,
RaftGroupService raftGroupService,
@@ -1571,30 +1738,6 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return catalog;
}
- /**
- * A helper method to get table states for the specified zone when
colocation is enabled.
- *
- * @param zoneName Zone name.
- * @return A collection of table states.
- */
- public Collection<TableState> zoneTablesStates(String zoneName) {
- Catalog catalog = catalogLatestVersion();
- int zoneId = zoneDescriptor(catalog, zoneName).id();
-
- return tableManager.zoneTables(zoneId).stream()
- .map(table -> {
- CatalogTableDescriptor tableDescriptor =
catalog.table(table.tableId());
- String schemaName =
catalog.schema(tableDescriptor.schemaId()).name();
-
- return new TableState(
- table.tableId(),
- table.qualifiedName().objectName(),
- schemaName
- );
- })
- .collect(toList());
- }
-
private static CatalogTableDescriptor tableDescriptor(Catalog catalog,
String schemaName, String tableName) {
CatalogTableDescriptor tableDescriptor = catalog.table(schemaName,
tableName);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
index b1ba02cad36..e5598d70213 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
@@ -60,6 +60,10 @@ public class LocalPartitionStateMessageByNode {
return map.get(node);
}
+ public Set<String> nodes() {
+ return map.keySet();
+ }
+
@Override
public String toString() {
return map.toString();
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 02eca4abbec..382c07b1ac0 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -26,6 +26,7 @@ import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
@@ -43,6 +44,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -268,6 +270,86 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
assertThat(selectAll(), hasSize(4));
}
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "false")
+ @Test
+ @ZoneParams(nodes = 2, replicas = 1, partitions = 2)
+ void testEstimatedRowsTable() throws Exception {
+ validateEstimatedRows();
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @Test
+ @ZoneParams(nodes = 2, replicas = 1, partitions = 2)
+ void testEstimatedRowsTableZone() throws Exception {
+ validateEstimatedRows();
+ }
+
+ private void validateEstimatedRows() throws InterruptedException {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ insert(0, 0);
+ insert(1, 1);
+
+ // Wait for replication to finish.
+ assertTrue(waitForCondition(() -> {
+ CompletableFuture<Map<TablePartitionId,
LocalTablePartitionStateByNode>> localStateTableFuture =
+
node.disasterRecoveryManager().localTablePartitionStates(emptySet(),
emptySet(), emptySet());
+
+ assertThat(localStateTableFuture,
willCompleteSuccessfully());
+ Map<TablePartitionId, LocalTablePartitionStateByNode>
localState;
+ try {
+ localState = localStateTableFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ Set<Long> size = localState.values().stream()
+ .flatMap(localTablePartitionStateByNode ->
localTablePartitionStateByNode.values().stream())
+ .map(state -> state.estimatedRows)
+ .collect(Collectors.toSet());
+ // There are 2 nodes, 2 partitions and 1 replica, so we
should have 2 entries in localState (one for each partition),
+ // LocalTablePartitionStateByNode should have a entry for
either the first or the second node with 1 row.
+ return size.size() == 1 && size.contains(1L) &&
localState.size() == 2;
+ },
+ 20_000
+ ));
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @Test
+ @ZoneParams(nodes = 2, replicas = 1, partitions = 2)
+ void testEstimatedRowsZone() throws Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ insert(0, 0);
+ insert(1, 1);
+
+ // Wait for replication to finish.
+ assertTrue(waitForCondition(() -> {
+ CompletableFuture<Map<ZonePartitionId,
LocalPartitionStateByNode>> localStateTableFuture =
+
node.disasterRecoveryManager().localPartitionStates(Set.of(ZONE_NAME),
emptySet(), emptySet());
+
+ assertThat(localStateTableFuture,
willCompleteSuccessfully());
+
+ Map<ZonePartitionId, LocalPartitionStateByNode> localState;
+ try {
+ localState = localStateTableFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ Set<Long> size = localState.values().stream()
+ .flatMap(localTablePartitionStateByNode ->
localTablePartitionStateByNode.values().stream())
+ .map(state -> state.estimatedRows)
+ .collect(Collectors.toSet());
+ // There are 2 nodes, 2 partitions and 1 replica, so we
should have 2 entries in localState (one for each partition),
+ // LocalTablePartitionStateByNode should have a entry for
either the first or the second node with 1 row.
+ return size.size() == 1 && size.contains(1L) &&
localState.size() == 2;
+ },
+ 20_000
+ ));
+ }
+
@Test
@ZoneParams(nodes = 2, replicas = 2, partitions = 2)
void testLocalPartitionStateTable() throws Exception {