This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8749e4da5a [IGNITE-22121] Change parameters for disaster recovery
partition states api (#3671)
8749e4da5a is described below
commit 8749e4da5a15a4f78edeb06972135289cd18e3d9
Author: Phillippko <[email protected]>
AuthorDate: Tue Apr 30 18:04:25 2024 +0400
[IGNITE-22121] Change parameters for disaster recovery partition states api
(#3671)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 16 ++
modules/platforms/cpp/ignite/common/error_codes.h | 10 +-
modules/platforms/cpp/ignite/odbc/common_types.cpp | 6 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 20 ++
.../rest/api/recovery/DisasterRecoveryApi.java | 44 ++--
.../api/recovery/GlobalPartitionStateResponse.java | 8 +
.../recovery/GlobalPartitionStatesResponse.java | 3 +
.../api/recovery/LocalPartitionStateResponse.java | 8 +
.../api/recovery/LocalPartitionStatesResponse.java | 3 +
.../handler/IgniteInternalExceptionHandler.java | 12 +-
.../recovery/ItDisasterRecoveryControllerTest.java | 243 +++++++++++++++++----
.../rest/recovery/DisasterRecoveryController.java | 43 ++--
.../ItDisasterRecoveryReconfigurationTest.java | 7 +-
.../disaster/DisasterRecoveryManager.java | 243 +++++++++++++++------
.../distributed/disaster/GlobalPartitionState.java | 5 +-
.../distributed/disaster/LocalPartitionState.java | 5 +-
...onState.java => LocalPartitionStateByNode.java} | 34 ++-
.../disaster/LocalPartitionStateMessageByNode.java | 53 +++++
.../DisasterRecoveryException.java} | 22 +-
.../NodesNotFoundException.java} | 20 +-
.../PartitionsNotFoundException.java} | 20 +-
.../ZonesNotFoundException.java} | 20 +-
.../messages/LocalPartitionStatesRequest.java | 7 +-
23 files changed, 636 insertions(+), 216 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index e61a7e136b..1254482d3b 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -605,4 +605,20 @@ public class ErrorGroups {
/** System-critical operation timed out. */
public static final int SYSTEM_CRITICAL_OPERATION_TIMEOUT_ERR =
CRITICAL_WORKERS_ERR_GROUP.registerErrorCode((short) 2);
}
+
+ /** Disaster recovery error group. */
+ @ErrorCodeGroup
+ public static class DisasterRecovery {
+ /** Disaster recovery group. */
+ public static final ErrorGroup RECOVERY_ERR_GROUP =
registerGroup("RECOVERY", (short) 20);
+
+ /** Partitions were not found. */
+ public static final int PARTITIONS_NOT_FOUND_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 1);
+
+ /** Nodes were not found. */
+ public static final int NODES_NOT_FOUND_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 2);
+
+ /** Error while returning partition states. */
+ public static final int PARTITION_STATE_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 3);
+ }
}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index f728001528..4bdb6d079b 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -49,7 +49,8 @@ enum class group : underlying_t {
COMPUTE = 0x10,
CATALOG = 0x11,
PLACEMENTDRIVER = 0x12,
- WORKERS = 0x13
+ WORKERS = 0x13,
+ RECOVERY = 0x14
};
inline group get_group_by_error_code(const underlying_t code) {
@@ -203,7 +204,12 @@ enum class code : underlying_t {
// CriticalWorkers group. Group code: 19
SYSTEM_WORKER_BLOCKED = 0x130001,
- SYSTEM_CRITICAL_OPERATION_TIMEOUT = 0x130002
+ SYSTEM_CRITICAL_OPERATION_TIMEOUT = 0x130002,
+
+ // DisasterRecovery group. Group code: 20
+ PARTITIONS_NOT_FOUND = 0x140001,
+ NODES_NOT_FOUND = 0x140002,
+ PARTITION_STATE = 0x140003
};
} // namespace error
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 6cdb6863d8..7c420aae7e 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -296,6 +296,12 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::SYSTEM_WORKER_BLOCKED:
case error::code::SYSTEM_CRITICAL_OPERATION_TIMEOUT:
return sql_state::SHY000_GENERAL_ERROR;
+
+ // DisasterRecovery group. Group code: 20
+ case error::code::NODES_NOT_FOUND:
+ case error::code::PARTITIONS_NOT_FOUND:
+ case error::code::PARTITION_STATE:
+ return sql_state::SHY000_GENERAL_ERROR;
}
return sql_state::SHY000_GENERAL_ERROR;
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 1a55aa4850..e96965a236 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -48,6 +48,7 @@ namespace Apache.Ignite
Catalog.GroupCode => Catalog.GroupName,
PlacementDriver.GroupCode => PlacementDriver.GroupName,
CriticalWorkers.GroupCode => CriticalWorkers.GroupName,
+ DisasterRecovery.GroupCode => DisasterRecovery.GroupName,
_ => UnknownGroupName
};
@@ -568,5 +569,24 @@ namespace Apache.Ignite
/// <summary> SystemCriticalOperationTimeout error. </summary>
public const int SystemCriticalOperationTimeout = (GroupCode <<
16) | (2 & 0xFFFF);
}
+
+ /// <summary> DisasterRecovery errors. </summary>
+ public static class DisasterRecovery
+ {
+ /// <summary> DisasterRecovery group code. </summary>
+ public const short GroupCode = 20;
+
+ /// <summary> DisasterRecovery group name. </summary>
+ public const String GroupName = "RECOVERY";
+
+ /// <summary> PartitionsNotFound error. </summary>
+ public const int PartitionsNotFound = (GroupCode << 16) | (1 &
0xFFFF);
+
+ /// <summary> NodesNotFound error. </summary>
+ public const int NodesNotFound = (GroupCode << 16) | (2 & 0xFFFF);
+
+ /// <summary> PartitionState error. </summary>
+ public const int PartitionState = (GroupCode << 16) | (3 & 0xFFFF);
+ }
}
}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
index 4fa1b92445..78be16d189 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -19,13 +19,15 @@ package org.apache.ignite.internal.rest.api.recovery;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
-import io.micronaut.http.annotation.PathVariable;
import io.micronaut.http.annotation.Produces;
+import io.micronaut.http.annotation.QueryValue;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.rest.api.Problem;
import org.apache.ignite.internal.rest.constants.MediaType;
@@ -41,34 +43,34 @@ public interface DisasterRecoveryApi {
@ApiResponse(responseCode = "200", description = "Partition states
returned.")
@ApiResponse(responseCode = "500", description = "Internal error.",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
- @Produces(MediaType.APPLICATION_JSON)
- CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates();
-
- @Get("state/local/{zoneName}")
- @Operation(operationId = "getLocalPartitionStatesByZone", description =
"Returns local partition states.")
- @ApiResponse(responseCode = "200", description = "Partition states
returned.")
- @ApiResponse(responseCode = "500", description = "Internal error.",
- content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
- @ApiResponse(responseCode = "404", description = "Zone is not found.",
+ @ApiResponse(responseCode = "400", description = "Bad request.",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
@Produces(MediaType.APPLICATION_JSON)
- CompletableFuture<LocalPartitionStatesResponse>
getLocalPartitionStates(@PathVariable("zoneName") String zoneName);
+ CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(
+ @QueryValue
+ @Schema(description = "Names specifying zones to get partition
states from. Case-sensitive, all zones if empty.")
+ Optional<Set<String>> zoneNames,
+ @QueryValue
+ @Schema(description = "Names specifying nodes to get partition
states from. Case-sensitive, all nodes if empty.")
+ Optional<Set<String>> nodeNames,
+ @QueryValue
+ @Schema(description = "IDs of partitions to get states. All
partitions if empty.") Optional<Set<Integer>> partitionIds
+ );
@Get("state/global")
@Operation(operationId = "getGlobalPartitionStates", description =
"Returns global partition states.")
@ApiResponse(responseCode = "200", description = "Partition states
returned.")
@ApiResponse(responseCode = "500", description = "Internal error.",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
- @Produces(MediaType.APPLICATION_JSON)
- CompletableFuture<GlobalPartitionStatesResponse>
getGlobalPartitionStates();
-
- @Get("state/global/{zoneName}")
- @Operation(operationId = "getGlobalPartitionStatesByZone", description =
"Returns global partition states.")
- @ApiResponse(responseCode = "200", description = "Partition states
returned.")
- @ApiResponse(responseCode = "500", description = "Internal error.",
- content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
- @ApiResponse(responseCode = "404", description = "Zone is not found.",
+ @ApiResponse(responseCode = "400", description = "Bad request.",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
@Produces(MediaType.APPLICATION_JSON)
- CompletableFuture<GlobalPartitionStatesResponse>
getGlobalPartitionStates(@PathVariable("zoneName") String zoneName);
+ CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(
+ @QueryValue
+ @Schema(description = "Names specifying zones to get partition
states from. Case-sensitive, all zones if empty.")
+ Optional<Set<String>> zoneNames,
+ @QueryValue
+ @Schema(description = "IDs of partitions to get states of. All
partitions if empty.")
+ Optional<Set<Integer>> partitionIds
+ );
}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
index 20dbbe9549..e3c8141553 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java
@@ -28,6 +28,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "Information about global partition state.")
public class GlobalPartitionStateResponse {
private final int partitionId;
+ private final String zoneName;
private final String tableName;
private final String state;
@@ -38,10 +39,12 @@ public class GlobalPartitionStateResponse {
public GlobalPartitionStateResponse(
@JsonProperty("partitionId") int partitionId,
@JsonProperty("tableName") String tableName,
+ @JsonProperty("zoneName") String zoneName,
@JsonProperty("state") String state
) {
this.partitionId = partitionId;
this.tableName = tableName;
+ this.zoneName = zoneName;
this.state = state;
}
@@ -55,6 +58,11 @@ public class GlobalPartitionStateResponse {
return tableName;
}
+ @JsonGetter("zoneName")
+ public String zoneName() {
+ return zoneName;
+ }
+
@JsonGetter("state")
public String state() {
return state;
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
index 5031abe864..17657b3df3 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.rest.api.recovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
@@ -28,7 +29,9 @@ import java.util.List;
*/
@Schema(description = "Information about global partition states.")
public class GlobalPartitionStatesResponse {
+ // Using JsonInclude to handle empty list correctly.
@Schema
+ @JsonInclude
private final List<GlobalPartitionStateResponse> states;
@JsonCreator
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
index c9d45b5c61..f7d8679fb6 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStateResponse.java
@@ -28,6 +28,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "Information about local partition state.")
public class LocalPartitionStateResponse {
private final int partitionId;
+ private final String zoneName;
private final String tableName;
private final String nodeName;
private final String state;
@@ -39,11 +40,13 @@ public class LocalPartitionStateResponse {
public LocalPartitionStateResponse(
@JsonProperty("partitionId") int partitionId,
@JsonProperty("tableName") String tableName,
+ @JsonProperty("zoneName") String zoneName,
@JsonProperty("nodeName") String nodeName,
@JsonProperty("state") String state
) {
this.partitionId = partitionId;
this.tableName = tableName;
+ this.zoneName = zoneName;
this.nodeName = nodeName;
this.state = state;
}
@@ -63,6 +66,11 @@ public class LocalPartitionStateResponse {
return nodeName;
}
+ @JsonGetter("zoneName")
+ public String zoneName() {
+ return zoneName;
+ }
+
@JsonGetter("state")
public String state() {
return state;
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
index 21c117434f..309cb081ec 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/LocalPartitionStatesResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.rest.api.recovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
@@ -28,7 +29,9 @@ import java.util.List;
*/
@Schema(description = "Information about local partition states.")
public class LocalPartitionStatesResponse {
+ // Using JsonInclude to handle empty list correctly.
@Schema
+ @JsonInclude
private final List<LocalPartitionStateResponse> states;
@JsonCreator
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
index 98e0d335aa..1b1e63b267 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
@@ -22,10 +22,12 @@ import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;
+import java.util.Set;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rest.api.Problem;
import org.apache.ignite.internal.rest.constants.HttpCode;
import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
import org.apache.ignite.lang.ErrorGroups.DistributionZones;
/**
@@ -35,11 +37,17 @@ import org.apache.ignite.lang.ErrorGroups.DistributionZones;
@Requires(classes = {IgniteInternalException.class, ExceptionHandler.class})
public class IgniteInternalExceptionHandler implements
ExceptionHandler<IgniteInternalException, HttpResponse<? extends Problem>> {
+ private static final Set<Integer> BAD_REQUEST_CODES = Set.of(
+ DistributionZones.ZONE_NOT_FOUND_ERR,
+ DisasterRecovery.PARTITIONS_NOT_FOUND_ERR,
+ DisasterRecovery.NODES_NOT_FOUND_ERR
+ );
+
@Override
public HttpResponse<? extends Problem> handle(HttpRequest request,
IgniteInternalException exception) {
- if (exception.code() == DistributionZones.ZONE_NOT_FOUND_ERR) {
+ if (BAD_REQUEST_CODES.contains(exception.code())) {
return HttpProblemResponse.from(
- Problem.fromHttpCode(HttpCode.NOT_FOUND)
+ Problem.fromHttpCode(HttpCode.BAD_REQUEST)
.detail(exception.getMessage())
.traceId(exception.traceId())
.code(exception.codeAsString())
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index 2d3a92ab61..e88c73b25d 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -18,139 +18,292 @@
package org.apache.ignite.internal.rest.recovery;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static java.util.stream.IntStream.range;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.ignite.internal.Cluster;
-import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
import
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
import
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
import
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* Test for disaster recovery REST commands.
*/
@MicronautTest
-public class ItDisasterRecoveryControllerTest extends
ClusterPerTestIntegrationTest {
+public class ItDisasterRecoveryControllerTest extends
ClusterPerClassIntegrationTest {
private static final String NODE_URL = "http://localhost:" +
Cluster.BASE_HTTP_PORT;
+ private static final Set<String> ZONES = Set.of("first_ZONE",
"second_ZONE", "third_ZONE");
+
+ private static final Set<String> MIXED_CASE_ZONES =
Set.of("mixed_first_zone", "MIXED_FIRST_ZONE", "mixed_second_zone",
+ "MIXED_SECOND_ZONE");
+
+ private static final Set<String> ZONES_CONTAINING_TABLES = new
HashSet<>(CollectionUtils.concat(ZONES, MIXED_CASE_ZONES));
+
+ private static final String EMPTY_ZONE = "empty_ZONE";
+
+ private static final Set<String> TABLE_NAMES =
ZONES_CONTAINING_TABLES.stream().map(it -> it + "_table").collect(toSet());
+
+ private static final Set<String> STATES = Set.of("HEALTHY", "AVAILABLE");
+
+ private static Set<String> nodeNames;
+
@Inject
@Client(NODE_URL + "/management/v1/recovery/")
HttpClient client;
- @Override
- protected int initialNodes() {
- return 1;
+ @BeforeAll
+ public static void setUp() {
+ ZONES_CONTAINING_TABLES.forEach(name -> {
+ sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'",
name, DEFAULT_AIPERSIST_PROFILE_NAME));
+ sql(String.format("CREATE TABLE \"%s_table\" (id INT PRIMARY KEY,
val INT) WITH PRIMARY_ZONE = '%1$s'", name));
+ });
+
+ sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'",
EMPTY_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
+
+ nodeNames =
CLUSTER.runningNodes().map(IgniteImpl::name).collect(toSet());
}
@Test
void testLocalPartitionStates() {
- executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
var response = client.toBlocking().exchange("/state/local/",
LocalPartitionStatesResponse.class);
assertEquals(HttpStatus.OK, response.status());
- LocalPartitionStatesResponse body = response.body();
- assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+ List<LocalPartitionStateResponse> states = response.body().states();
+
+ assertFalse(states.isEmpty());
- List<Integer> partitionIds =
body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList());
+ List<Integer> partitionIds =
states.stream().map(LocalPartitionStateResponse::partitionId).distinct().collect(toList());
assertEquals(range(0,
DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+
+ checkLocalStates(states, ZONES_CONTAINING_TABLES, nodeNames);
}
@Test
- void testLocalPartitionStatesByZoneMissingZone() {
+ void testLocalPartitionStatesNodeNotFound() {
HttpClientResponseException thrown = assertThrows(
HttpClientResponseException.class,
- () ->
client.toBlocking().exchange("/state/local/no-such-zone/",
LocalPartitionStatesResponse.class)
+ () ->
client.toBlocking().exchange("/state/local?nodeNames=no-such-node",
LocalPartitionStatesResponse.class)
);
- assertEquals(HttpStatus.NOT_FOUND, thrown.getResponse().status());
+ assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+ assertThat(thrown.getMessage(), containsString("Some nodes are
missing: [no-such-node]"));
}
@Test
- void testLocalPartitionStatesByZone() {
- executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+ void testLocalPartitionStatesZoneNotFound() {
+ HttpClientResponseException thrown = assertThrows(
+ HttpClientResponseException.class,
+ () ->
client.toBlocking().exchange("/state/local?zoneNames=no-such-zone",
LocalPartitionStatesResponse.class)
+ );
- executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" +
DEFAULT_AIPERSIST_PROFILE_NAME + "'");
- executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH
PRIMARY_ZONE = 'FOO'");
+ assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+ assertThat(thrown.getMessage(), containsString("Some distribution
zones are missing: [no-such-zone]"));
+ }
- var response = client.toBlocking().exchange("/state/local/Default/",
LocalPartitionStatesResponse.class);
+ @Test
+ void testLocalPartitionStatesPartitionNotFound() {
+ HttpClientResponseException thrown = assertThrows(
+ HttpClientResponseException.class,
+ () ->
client.toBlocking().exchange("/state/local?partitionIds=-1",
LocalPartitionStatesResponse.class)
+ );
+
+ assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+ }
+
+ @Test
+ void testLocalPartitionsEmptyResult() {
+ HttpResponse<LocalPartitionStatesResponse> response =
client.toBlocking().exchange(
+ "/state/local?zoneNames=" + EMPTY_ZONE,
+ LocalPartitionStatesResponse.class
+ );
assertEquals(HttpStatus.OK, response.status());
- assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+ assertEquals(0, response.body().states().size());
+ }
+
+ @Test
+ void testLocalPartitionStatesByZones() {
+ String url = "state/local?zoneNames=" + String.join(",", ZONES);
+
+ var response = client.toBlocking().exchange(url,
LocalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ checkLocalStates(response.body().states(), ZONES, nodeNames);
+ }
+
+ @Test
+ void testLocalPartitionStatesByZonesCheckCase() {
+ String url = "state/local?zoneNames=" + String.join(",",
MIXED_CASE_ZONES);
+
+ var response = client.toBlocking().exchange(url,
LocalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ checkLocalStates(response.body().states(), MIXED_CASE_ZONES,
nodeNames);
+ }
+
+ @Test
+ void testLocalPartitionStatesByNodes() {
+ Set<String> nodeNames = Set.of(CLUSTER.node(0).node().name(),
CLUSTER.node(1).node().name());
+
+ String url = "state/local?nodeNames=" + String.join(",", nodeNames);
- response = client.toBlocking().exchange("/state/local/FOO/",
LocalPartitionStatesResponse.class);
+ var response = client.toBlocking().exchange(url,
LocalPartitionStatesResponse.class);
+
+ assertEquals(HttpStatus.OK, response.status());
+
+ checkLocalStates(response.body().states(), ZONES_CONTAINING_TABLES,
nodeNames);
+ }
+
+ @Test
+ void testLocalPartitionStatesByNodesIsCaseSensitive() {
+ Set<String> nodeNames = Set.of(CLUSTER.node(0).node().name(),
CLUSTER.node(1).node().name());
+
+ String url = "state/local?nodeNames=" + String.join(",",
nodeNames).toUpperCase();
+
+ HttpClientResponseException thrown = assertThrows(
+ HttpClientResponseException.class,
+ () -> client.toBlocking().exchange(url,
LocalPartitionStatesResponse.class)
+ );
+
+ nodeNames.forEach(nodeName -> assertThat(thrown.getMessage(),
containsString(nodeName.toUpperCase())));
+ }
+
+ @Test
+ void testLocalPartitionStatesByPartitions() {
+ Set<String> partitionIds = Set.of("1", "2");
+
+ String url = "state/local?partitionIds=" + String.join(",",
partitionIds);
+
+ var response = client.toBlocking().exchange(url,
LocalPartitionStatesResponse.class);
assertEquals(HttpStatus.OK, response.status());
List<LocalPartitionStateResponse> states = response.body().states();
- assertEquals(1, states.size());
- LocalPartitionStateResponse state = states.get(0);
- assertEquals(0, state.partitionId());
- assertEquals("idrct_tlpsbz_0", state.nodeName());
- assertEquals("FOO", state.tableName());
- assertEquals("HEALTHY", state.state());
+ for (LocalPartitionStateResponse state : states) {
+
assertTrue(partitionIds.contains((String.valueOf(state.partitionId()))));
+ }
+
+ checkLocalStates(states, ZONES_CONTAINING_TABLES, nodeNames);
}
@Test
void testGlobalPartitionStates() {
- executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
var response = client.toBlocking().exchange("/state/global/",
GlobalPartitionStatesResponse.class);
assertEquals(HttpStatus.OK, response.status());
- GlobalPartitionStatesResponse body = response.body();
- assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+ List<GlobalPartitionStateResponse> states = response.body().states();
+ assertFalse(response.body().states().isEmpty());
- List<Integer> partitionIds =
body.states().stream().map(GlobalPartitionStateResponse::partitionId).collect(toList());
+ List<Integer> partitionIds =
states.stream().map(GlobalPartitionStateResponse::partitionId).distinct().collect(toList());
assertEquals(range(0,
DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+
+ checkGlobalStates(states, ZONES_CONTAINING_TABLES);
+ }
+
+ @Test
+ void testGlobalPartitionStatesZoneNotFound() {
+ HttpClientResponseException thrown = assertThrows(
+ HttpClientResponseException.class,
+ () ->
client.toBlocking().exchange("/state/global?zoneNames=no-such-zone",
GlobalPartitionStatesResponse.class)
+ );
+
+ assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+ assertThat(thrown.getMessage(), containsString("Some distribution
zones are missing: [no-such-zone]"));
}
@Test
- void testGlobalPartitionStatesByZoneMissingZone() {
+ void testGlobalPartitionStatesPartitionNotFound() {
HttpClientResponseException thrown = assertThrows(
HttpClientResponseException.class,
- () ->
client.toBlocking().exchange("/state/global/no-such-zone/",
GlobalPartitionStatesResponse.class)
+ () ->
client.toBlocking().exchange("/state/global?partitionIds=-1",
GlobalPartitionStatesResponse.class)
);
- assertEquals(HttpStatus.NOT_FOUND, thrown.getResponse().status());
+ assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+ assertThat(thrown.getMessage(), containsString("Some partitions are
missing: [-1]"));
}
@Test
- void testGlobalPartitionStatesByZone() {
- executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+ void testGlobalPartitionsEmptyResult() {
+ HttpResponse<GlobalPartitionStatesResponse> response =
client.toBlocking().exchange(
+ "/state/global?zoneNames=" + EMPTY_ZONE,
+ GlobalPartitionStatesResponse.class
+ );
- executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" +
DEFAULT_AIPERSIST_PROFILE_NAME + "'");
- executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH
PRIMARY_ZONE = 'FOO'");
+ assertEquals(HttpStatus.OK, response.status());
+ assertEquals(0, response.body().states().size());
+ }
+
+ @Test
+ void testGlobalPartitionStatesByZones() {
+ String url = "state/global?zoneNames=" + String.join(",", ZONES);
- var response = client.toBlocking().exchange("/state/global/Default/",
GlobalPartitionStatesResponse.class);
+ var response = client.toBlocking().exchange(url,
GlobalPartitionStatesResponse.class);
assertEquals(HttpStatus.OK, response.status());
- assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
- response = client.toBlocking().exchange("/state/global/FOO/",
GlobalPartitionStatesResponse.class);
+ checkGlobalStates(response.body().states(), ZONES);
+ }
+
+ @Test
+ void testGlobalPartitionStatesByZonesCheckCase() {
+ String url = "state/global?zoneNames=" + String.join(",",
MIXED_CASE_ZONES);
+
+ var response = client.toBlocking().exchange(url,
GlobalPartitionStatesResponse.class);
assertEquals(HttpStatus.OK, response.status());
- List<GlobalPartitionStateResponse> states = response.body().states();
- assertEquals(1, states.size());
+ checkGlobalStates(response.body().states(), MIXED_CASE_ZONES);
+ }
+
+ private static void checkLocalStates(List<LocalPartitionStateResponse>
states, Set<String> zoneNames, Set<String> nodes) {
+ assertFalse(states.isEmpty());
+
+ states.forEach(state -> {
+ assertThat(zoneNames, hasItem(state.zoneName()));
+ assertThat(nodes, hasItem(state.nodeName()));
+ assertThat(TABLE_NAMES, hasItem(state.tableName()));
+ assertThat(STATES, hasItem(state.state()));
+ });
+ }
+
+ private static void checkGlobalStates(List<GlobalPartitionStateResponse>
states, Set<String> zoneNames) {
+ assertFalse(states.isEmpty());
- GlobalPartitionStateResponse state = states.get(0);
- assertEquals(0, state.partitionId());
- assertEquals("FOO", state.tableName());
- assertEquals("AVAILABLE", state.state());
+ states.forEach(state -> {
+ assertThat(zoneNames, hasItem(state.zoneName()));
+ assertThat(TABLE_NAMES, hasItem(state.tableName()));
+ assertThat(STATES, hasItem(state.state()));
+ });
}
}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index 4598ab995f..182a0a5d27 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi;
@@ -36,6 +38,7 @@ import
org.apache.ignite.internal.rest.exception.handler.IgniteInternalException
import
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
import
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
/**
* Disaster recovery controller.
@@ -50,29 +53,35 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi {
}
@Override
- public CompletableFuture<LocalPartitionStatesResponse>
getLocalPartitionStates() {
- return
disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates);
+ public CompletableFuture<LocalPartitionStatesResponse>
getLocalPartitionStates(
+ Optional<Set<String>> zoneNames,
+ Optional<Set<String>> nodeNames,
+ Optional<Set<Integer>> partitionIds
+ ) {
+ return disasterRecoveryManager.localPartitionStates(
+ zoneNames.orElse(Set.of()),
+ nodeNames.orElse(Set.of()),
+ partitionIds.orElse(Set.of())
+ )
+ .thenApply(DisasterRecoveryController::convertLocalStates);
}
@Override
- public CompletableFuture<LocalPartitionStatesResponse>
getLocalPartitionStates(String zoneName) {
- return
disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates);
+ public CompletableFuture<GlobalPartitionStatesResponse>
getGlobalPartitionStates(
+ Optional<Set<String>> zoneNames,
+ Optional<Set<Integer>> partitionIds
+ ) {
+ return disasterRecoveryManager.globalPartitionStates(
+ zoneNames.orElse(Set.of()),
+ partitionIds.orElse(Set.of())
+ )
+ .thenApply(DisasterRecoveryController::convertGlobalStates);
}
- @Override
- public CompletableFuture<GlobalPartitionStatesResponse>
getGlobalPartitionStates() {
- return
disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates);
- }
-
- @Override
- public CompletableFuture<GlobalPartitionStatesResponse>
getGlobalPartitionStates(String zoneName) {
- return
disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates);
- }
-
- private static LocalPartitionStatesResponse
convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>>
localStates) {
+ private static LocalPartitionStatesResponse
convertLocalStates(Map<TablePartitionId, LocalPartitionStateByNode>
localStates) {
List<LocalPartitionStateResponse> states = new ArrayList<>();
- for (Map<String, LocalPartitionState> map : localStates.values()) {
+ for (LocalPartitionStateByNode map : localStates.values()) {
for (Entry<String, LocalPartitionState> entry : map.entrySet()) {
String nodeName = entry.getKey();
LocalPartitionState state = entry.getValue();
@@ -80,6 +89,7 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi {
states.add(new LocalPartitionStateResponse(
state.partitionId,
state.tableName,
+ state.zoneName,
nodeName,
state.state.name()
));
@@ -101,6 +111,7 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi {
states.add(new GlobalPartitionStateResponse(
state.partitionId,
state.tableName,
+ state.zoneName,
state.state.name()
));
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index b0d502878c..f128f9913a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -60,7 +60,7 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
-import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
@@ -326,10 +326,11 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
}
private List<Integer> getRealAssignments(IgniteImpl node0, int partId) {
- var partitionStatesFut =
node0.disasterRecoveryManager().localPartitionStates(zoneName);
+ CompletableFuture<Map<TablePartitionId, LocalPartitionStateByNode>>
partitionStatesFut = node0.disasterRecoveryManager()
+ .localPartitionStates(Set.of(zoneName), Set.of(), Set.of());
assertThat(partitionStatesFut, willCompleteSuccessfully());
- Map<String, LocalPartitionState> partitionStates =
partitionStatesFut.join().get(new TablePartitionId(tableId, partId));
+ LocalPartitionStateByNode partitionStates =
partitionStatesFut.join().get(new TablePartitionId(tableId, partId));
return partitionStates.keySet()
.stream()
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 66559dfc01..6907623397 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -18,8 +18,11 @@
package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptyList;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
@@ -29,12 +32,13 @@ import static
org.apache.ignite.internal.table.distributed.disaster.LocalPartiti
import static
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INITIALIZING;
import static
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.INSTALLING_SNAPSHOT;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -44,11 +48,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
-import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -64,10 +68,15 @@ import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.PartitionsNotFoundException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
import
org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
import
org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest;
import
org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.core.State;
@@ -97,9 +106,6 @@ public class DisasterRecoveryManager implements
IgniteComponent {
*/
private static final int CATCH_UP_THRESHOLD = 100;
- /** Zone ID that corresponds to "all zones". */
- private static final int NO_ZONE_ID = -1;
-
/** Thread pool executor for async parts. */
private final ExecutorService threadPool;
@@ -195,65 +201,65 @@ public class DisasterRecoveryManager implements
IgniteComponent {
}
/**
- * Returns partition states for all zones' partitions in the cluster.
Result is a mapping of {@link TablePartitionId} to the mapping
+ * Returns states of partitions in the cluster. Result is a mapping of
{@link TablePartitionId} to the mapping
* between a node name and a partition state.
*
- * @param zoneName Zone name. {@code null} means "all zones".
+ * @param zoneNames Names specifying zones to get partition states from.
Case-sensitive, empty set means "all zones".
+ * @param nodeNames Names specifying nodes to get partition states from.
Case-sensitive, empty set means "all nodes".
+ * @param partitionIds IDs of partitions to get states of. Empty set means
"all partitions".
* @return Future with the mapping.
*/
- public CompletableFuture<Map<TablePartitionId, Map<String,
LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) {
+ public CompletableFuture<Map<TablePartitionId, LocalPartitionStateByNode>>
localPartitionStates(
+ Set<String> zoneNames,
+ Set<String> nodeNames,
+ Set<Integer> partitionIds
+ ) {
Catalog catalog =
catalogManager.catalog(catalogManager.latestCatalogVersion());
- return localPartitionStatesInternal(zoneName, catalog)
+ return localPartitionStatesInternal(zoneNames, nodeNames,
partitionIds, catalog)
.thenApply(res -> normalizeLocal(res, catalog));
}
/**
- * Returns partition states for all zones' partitions in the cluster.
Result is a mapping of {@link TablePartitionId} to the global
+ * Returns states of partitions in the cluster. Result is a mapping of
{@link TablePartitionId} to the global
* partition state enum value.
*
- * @param zoneName Zone name. {@code null} means "all zones".
+ * @param zoneNames Names specifying zones to get partition states.
Case-sensitive, empty set means "all zones".
+ * @param partitionIds IDs of partitions to get states of. Empty set means
"all partitions".
* @return Future with the mapping.
*/
- public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>>
globalPartitionStates(@Nullable String zoneName) {
+ public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>>
globalPartitionStates(
+ Set<String> zoneNames,
+ Set<Integer> partitionIds
+ ) {
Catalog catalog =
catalogManager.catalog(catalogManager.latestCatalogVersion());
- return localPartitionStatesInternal(zoneName, catalog)
+ return localPartitionStatesInternal(zoneNames, Set.of(), partitionIds,
catalog)
.thenApply(res -> normalizeLocal(res, catalog))
.thenApply(res -> assembleGlobal(res, catalog));
}
- private CompletableFuture<Map<TablePartitionId, Map<String,
LocalPartitionStateMessage>>> localPartitionStatesInternal(
- @Nullable String zoneName, Catalog catalog
+ private CompletableFuture<Map<TablePartitionId,
LocalPartitionStateMessageByNode>> localPartitionStatesInternal(
+ Set<String> zoneNames,
+ Set<String> nodeNames,
+ Set<Integer> partitionIds,
+ Catalog catalog
) {
- int zoneId;
- if (zoneName == null) {
- zoneId = NO_ZONE_ID;
- } else {
- Optional<CatalogZoneDescriptor> zoneDesciptorOptional =
catalog.zones().stream()
- .filter(catalogZoneDescriptor ->
catalogZoneDescriptor.name().equals(zoneName))
- .findAny();
-
- if (zoneDesciptorOptional.isEmpty()) {
- return CompletableFuture.failedFuture(new
DistributionZoneNotFoundException(zoneName, null));
- }
+ Set<Integer> zoneIds = getZoneIds(zoneNames, catalog);
- CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
- zoneId = zoneDescriptor.id();
- }
-
- Set<NodeWithAttributes> logicalTopology = dzManager.logicalTopology();
+ Set<NodeWithAttributes> nodes = getNodes(nodeNames);
LocalPartitionStatesRequest localPartitionStatesRequest =
MSG_FACTORY.localPartitionStatesRequest()
- .zoneId(zoneId)
+ .zoneIds(zoneIds)
+ .partitionIds(partitionIds)
.catalogVersion(catalog.version())
.build();
- Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result
= new ConcurrentHashMap<>();
- CompletableFuture<?>[] futures = new
CompletableFuture[logicalTopology.size()];
+ Map<TablePartitionId, LocalPartitionStateMessageByNode> result = new
ConcurrentHashMap<>();
+ CompletableFuture<?>[] futures = new CompletableFuture[nodes.size()];
int i = 0;
- for (NodeWithAttributes node : logicalTopology) {
+ for (NodeWithAttributes node : nodes) {
CompletableFuture<NetworkMessage> invokeFuture =
messagingService.invoke(
node.nodeName(),
localPartitionStatesRequest,
@@ -266,20 +272,80 @@ public class DisasterRecoveryManager implements
IgniteComponent {
var response = (LocalPartitionStatesResponse) networkMessage;
for (LocalPartitionStateMessage state : response.states()) {
- result.compute(state.partitionId().asTablePartitionId(),
(tablePartitionId, map) -> {
- if (map == null) {
- return Map.of(node.nodeName(), state);
+ result.compute(state.partitionId().asTablePartitionId(),
(tablePartitionId, messageByNode) -> {
+ if (messageByNode == null) {
+ return new
LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
}
- map = new HashMap<>(map);
- map.put(node.nodeName(), state);
- return map;
+ messageByNode = new
LocalPartitionStateMessageByNode(messageByNode);
+ messageByNode.put(node.nodeName(), state);
+ return messageByNode;
});
}
});
}
- return CompletableFuture.allOf(futures).handle((unused, throwable) ->
result);
+ return allOf(futures).handle((unused, err) -> {
+ if (err != null) {
+ throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
+ }
+
+ if (!partitionIds.isEmpty()) {
+ Set<Integer> foundPartitionIds = result.keySet().stream()
+ .map(TablePartitionId::partitionId)
+ .collect(toSet());
+
+ checkPartitions(foundPartitionIds, partitionIds);
+ }
+
+ return result;
+ });
+ }
+
+ private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws
NodesNotFoundException {
+ if (nodeNames.isEmpty()) {
+ return dzManager.logicalTopology();
+ }
+
+ Set<NodeWithAttributes> nodes = dzManager.logicalTopology().stream()
+ .filter(node -> nodeNames.contains(node.nodeName()))
+ .collect(toSet());
+
+ Set<String> foundNodeNames = nodes.stream()
+ .map(NodeWithAttributes::nodeName)
+ .collect(toSet());
+
+ if (!nodeNames.equals(foundNodeNames)) {
+ Set<String> missingNodeNames =
CollectionUtils.difference(nodeNames, foundNodeNames);
+
+ throw new NodesNotFoundException(missingNodeNames);
+ }
+
+ return nodes;
+ }
+
+ private static Set<Integer> getZoneIds(Set<String> zoneNames, Catalog
catalog) throws ZonesNotFoundException {
+ if (zoneNames.isEmpty()) {
+ return Set.of();
+ }
+
+ List<CatalogZoneDescriptor> zoneDescriptors = catalog.zones().stream()
+ .filter(catalogZoneDescriptor ->
zoneNames.contains(catalogZoneDescriptor.name()))
+ .collect(toList());
+
+ Set<String> foundZoneNames = zoneDescriptors.stream()
+ .map(CatalogObjectDescriptor::name)
+ .collect(toSet());
+
+ if (!zoneNames.equals(foundZoneNames)) {
+ Set<String> missingZoneNames =
CollectionUtils.difference(zoneNames, foundZoneNames);
+
+ throw new ZonesNotFoundException(missingZoneNames);
+ }
+
+ return zoneDescriptors.stream()
+ .map(CatalogObjectDescriptor::id)
+ .collect(toSet());
}
/**
@@ -348,9 +414,14 @@ public class DisasterRecoveryManager implements
IgniteComponent {
if (raftNodeId.groupId() instanceof TablePartitionId) {
var tablePartitionId = (TablePartitionId)
raftNodeId.groupId();
+ if (!containsOrEmpty(tablePartitionId.partitionId(),
request.partitionIds())) {
+ return;
+ }
+
CatalogTableDescriptor tableDescriptor =
catalogManager.table(tablePartitionId.tableId(), catalogVersion);
// Only tables that belong to a specific catalog version
will be returned.
- if (tableDescriptor == null || request.zoneId() !=
NO_ZONE_ID && tableDescriptor.zoneId() != request.zoneId()) {
+ if (tableDescriptor == null ||
!containsOrEmpty(tableDescriptor.zoneId(), request.zoneIds())
+ ) {
return;
}
@@ -389,6 +460,10 @@ public class DisasterRecoveryManager implements
IgniteComponent {
}, threadPool);
}
+ private static <T> boolean containsOrEmpty(T item, Collection<T>
collection) {
+ return collection.isEmpty() || collection.contains(item);
+ }
+
/**
* Converts internal raft node state into public local partition state.
*/
@@ -417,45 +492,85 @@ public class DisasterRecoveryManager implements
IgniteComponent {
}
}
+ /**
+ * Checks that resulting states contain all partitions IDs from the
request.
+ *
+ * @param foundPartitionIds Found partition IDs.
+ * @param requestedPartitionIds Requested partition IDs.
+ * @throws PartitionsNotFoundException if some IDs are missing.
+ */
+ private static void checkPartitions(
+ Set<Integer> foundPartitionIds,
+ Set<Integer> requestedPartitionIds
+ ) throws PartitionsNotFoundException {
+ if (!requestedPartitionIds.equals(foundPartitionIds)) {
+ Set<Integer> missingPartitionIds =
CollectionUtils.difference(requestedPartitionIds, foundPartitionIds);
+
+ throw new PartitionsNotFoundException(missingPartitionIds);
+ }
+ }
+
/**
* Replaces some healthy states with a {@link
LocalPartitionStateEnum#CATCHING_UP}, it can only be done once the state of all
peers is
* known.
*/
- private static Map<TablePartitionId, Map<String, LocalPartitionState>>
normalizeLocal(
- Map<TablePartitionId, Map<String, LocalPartitionStateMessage>>
result,
+ private static Map<TablePartitionId, LocalPartitionStateByNode>
normalizeLocal(
+ Map<TablePartitionId, LocalPartitionStateMessageByNode> result,
Catalog catalog
) {
- return result.entrySet().stream().collect(toMap(Map.Entry::getKey,
entry -> {
+ Map<TablePartitionId, LocalPartitionStateByNode> map = new HashMap<>();
+
+ for (Map.Entry<TablePartitionId, LocalPartitionStateMessageByNode>
entry : result.entrySet()) {
TablePartitionId tablePartitionId = entry.getKey();
- Map<String, LocalPartitionStateMessage> map = entry.getValue();
+ LocalPartitionStateMessageByNode messageByNode = entry.getValue();
// noinspection OptionalGetWithoutIsPresent
- long maxLogIndex =
map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
+ long maxLogIndex = messageByNode.values().stream()
+ .mapToLong(LocalPartitionStateMessage::logIndex)
+ .max()
+ .getAsLong();
+
+ Map<String, LocalPartitionState> nodeToStateMap =
messageByNode.entrySet().stream()
+ .collect(toMap(Map.Entry::getKey, nodeToState ->
+ toLocalPartitionState(nodeToState, maxLogIndex,
tablePartitionId, catalog))
+ );
- return map.entrySet().stream().collect(toMap(Map.Entry::getKey,
entry2 -> {
- LocalPartitionStateMessage stateMsg = entry2.getValue();
+ map.put(tablePartitionId, new
LocalPartitionStateByNode(nodeToStateMap));
+ }
- LocalPartitionStateEnum stateEnum = stateMsg.state();
+ return map;
+ }
- if (stateMsg.state() == HEALTHY && maxLogIndex -
stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
- stateEnum = CATCHING_UP;
- }
+ private static LocalPartitionState toLocalPartitionState(
+ Map.Entry<String, LocalPartitionStateMessage> nodeToMessage,
+ long maxLogIndex,
+ TablePartitionId tablePartitionId,
+ Catalog catalog
+ ) {
+ LocalPartitionStateMessage stateMsg = nodeToMessage.getValue();
+
+ LocalPartitionStateEnum stateEnum = stateMsg.state();
+
+ if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex()
>= CATCH_UP_THRESHOLD) {
+ stateEnum = CATCHING_UP;
+ }
+
+ // Tables, returned from local states request, are always present in
the required version of the catalog.
+ CatalogTableDescriptor tableDescriptor =
catalog.table(tablePartitionId.tableId());
+
+ String zoneName = catalog.zone(tableDescriptor.zoneId()).name();
- // Tables, returned from local states request, are always
present in the required version of the catalog.
- CatalogTableDescriptor tableDescriptor =
catalog.table(tablePartitionId.tableId());
- return new LocalPartitionState(tableDescriptor.name(),
tablePartitionId.partitionId(), stateEnum);
- }));
- }));
+ return new LocalPartitionState(tableDescriptor.name(), zoneName,
tablePartitionId.partitionId(), stateEnum);
}
private static Map<TablePartitionId, GlobalPartitionState> assembleGlobal(
- Map<TablePartitionId, Map<String, LocalPartitionState>>
localResult,
+ Map<TablePartitionId, LocalPartitionStateByNode> localResult,
Catalog catalog
) {
Map<TablePartitionId, GlobalPartitionState> result =
localResult.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> {
TablePartitionId tablePartitionId = entry.getKey();
- Map<String, LocalPartitionState> map = entry.getValue();
+ LocalPartitionStateByNode map = entry.getValue();
return assembleGlobalStateFromLocal(catalog,
tablePartitionId, map);
}));
@@ -473,7 +588,7 @@ public class DisasterRecoveryManager implements
IgniteComponent {
TablePartitionId tablePartitionId = new
TablePartitionId(tableId, partitionId);
result.computeIfAbsent(tablePartitionId, key ->
- new
GlobalPartitionState(catalog.table(key.tableId()).name(), key.partitionId(),
+ new
GlobalPartitionState(catalog.table(key.tableId()).name(),
zoneDescriptor.name(), key.partitionId(),
GlobalPartitionStateEnum.UNAVAILABLE)
);
}
@@ -485,7 +600,7 @@ public class DisasterRecoveryManager implements
IgniteComponent {
private static GlobalPartitionState assembleGlobalStateFromLocal(
Catalog catalog,
TablePartitionId tablePartitionId,
- Map<String, LocalPartitionState> map
+ LocalPartitionStateByNode map
) {
// Tables, returned from local states request, are always present in
the required version of the catalog.
int zoneId = catalog.table(tablePartitionId.tableId()).zoneId();
@@ -512,6 +627,6 @@ public class DisasterRecoveryManager implements
IgniteComponent {
}
LocalPartitionState anyLocalState = map.values().iterator().next();
- return new GlobalPartitionState(anyLocalState.tableName,
tablePartitionId.partitionId(), globalStateEnum);
+ return new GlobalPartitionState(anyLocalState.tableName,
zoneDescriptor.name(), tablePartitionId.partitionId(), globalStateEnum);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
index 816bfc670d..754d57a50e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionState.java
@@ -23,12 +23,15 @@ package
org.apache.ignite.internal.table.distributed.disaster;
public class GlobalPartitionState {
public final String tableName;
+ public final String zoneName;
+
public final int partitionId;
public final GlobalPartitionStateEnum state;
- GlobalPartitionState(String tableName, int partitionId,
GlobalPartitionStateEnum state) {
+ GlobalPartitionState(String tableName, String zoneName, int partitionId,
GlobalPartitionStateEnum state) {
this.tableName = tableName;
+ this.zoneName = zoneName;
this.partitionId = partitionId;
this.state = state;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
index 0db3f39b09..7ab2884a8d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
@@ -23,12 +23,15 @@ package
org.apache.ignite.internal.table.distributed.disaster;
public class LocalPartitionState {
public final String tableName;
+ public final String zoneName;
+
public final int partitionId;
public final LocalPartitionStateEnum state;
- LocalPartitionState(String tableName, int partitionId,
LocalPartitionStateEnum state) {
+ LocalPartitionState(String tableName, String zoneName, int partitionId,
LocalPartitionStateEnum state) {
this.tableName = tableName;
+ this.zoneName = zoneName;
this.partitionId = partitionId;
this.state = state;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateByNode.java
similarity index 53%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateByNode.java
index 0db3f39b09..a53301cf1b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateByNode.java
@@ -17,19 +17,31 @@
package org.apache.ignite.internal.table.distributed.disaster;
-/**
- * Local partition state.
- */
-public class LocalPartitionState {
- public final String tableName;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/** Container for LocalPartitionState to node name map. */
+public class LocalPartitionStateByNode {
+ private final Map<String, LocalPartitionState> map;
+
+ public LocalPartitionStateByNode(Map<String, LocalPartitionState> map) {
+ this.map = Map.copyOf(map);
+ }
- public final int partitionId;
+ /** Returns collection of local partition states. */
+ public Collection<LocalPartitionState> values() {
+ return map.values();
+ }
- public final LocalPartitionStateEnum state;
+ /** Returns set of map entries. */
+ public Set<Entry<String, LocalPartitionState>> entrySet() {
+ return map.entrySet();
+ }
- LocalPartitionState(String tableName, int partitionId,
LocalPartitionStateEnum state) {
- this.tableName = tableName;
- this.partitionId = partitionId;
- this.state = state;
+ /** Returns set of node names. */
+ public Set<String> keySet() {
+ return map.keySet();
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
new file mode 100644
index 0000000000..b17095ce90
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import
org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStateMessage;
+
+/** Container for LocalPartitionStateMessage to node name map. */
+public class LocalPartitionStateMessageByNode {
+ private final Map<String, LocalPartitionStateMessage> map;
+
+ public LocalPartitionStateMessageByNode(LocalPartitionStateMessageByNode
other) {
+ this.map = new HashMap<>(other.map);
+ }
+
+ public LocalPartitionStateMessageByNode(Map<String,
LocalPartitionStateMessage> map) {
+ this.map = new HashMap<>(map);
+ }
+
+ /** Returns collection of local partition states. */
+ public Collection<LocalPartitionStateMessage> values() {
+ return map.values();
+ }
+
+ /** Returns set of map entries. */
+ public Set<Entry<String, LocalPartitionStateMessage>> entrySet() {
+ return map.entrySet();
+ }
+
+ /** Puts node to state mapping. */
+ public void put(String nodeName, LocalPartitionStateMessage state) {
+ map.put(nodeName, state);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/DisasterRecoveryException.java
similarity index 60%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/DisasterRecoveryException.java
index 0db3f39b09..e3d4fcbfdc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/DisasterRecoveryException.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-/**
- * Local partition state.
- */
-public class LocalPartitionState {
- public final String tableName;
+import org.apache.ignite.internal.lang.IgniteInternalException;
- public final int partitionId;
+/** Common exception for disaster recovery. */
+public class DisasterRecoveryException extends IgniteInternalException {
+ private static final long serialVersionUID = -3565357739782565015L;
- public final LocalPartitionStateEnum state;
+ public DisasterRecoveryException(int code, Throwable cause) {
+ super(code, cause);
+ }
- LocalPartitionState(String tableName, int partitionId,
LocalPartitionStateEnum state) {
- this.tableName = tableName;
- this.partitionId = partitionId;
- this.state = state;
+ public DisasterRecoveryException(int code, String message) {
+ super(code, message);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodesNotFoundException.java
similarity index 60%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodesNotFoundException.java
index 0db3f39b09..0ab91f65e2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodesNotFoundException.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-/**
- * Local partition state.
- */
-public class LocalPartitionState {
- public final String tableName;
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.NODES_NOT_FOUND_ERR;
- public final int partitionId;
+import java.util.Set;
- public final LocalPartitionStateEnum state;
+/** Exception is thrown when appropriate node can`t be found. */
+public class NodesNotFoundException extends DisasterRecoveryException {
+ private static final long serialVersionUID = -6295004626426857228L;
- LocalPartitionState(String tableName, int partitionId,
LocalPartitionStateEnum state) {
- this.tableName = tableName;
- this.partitionId = partitionId;
- this.state = state;
+ public NodesNotFoundException(Set<String> missingNodes) {
+ super(NODES_NOT_FOUND_ERR, "Some nodes are missing: " + missingNodes);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/PartitionsNotFoundException.java
similarity index 58%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/PartitionsNotFoundException.java
index 0db3f39b09..29767dba6b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/PartitionsNotFoundException.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-/**
- * Local partition state.
- */
-public class LocalPartitionState {
- public final String tableName;
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITIONS_NOT_FOUND_ERR;
- public final int partitionId;
+import java.util.Set;
- public final LocalPartitionStateEnum state;
+/** Exception is thrown when appropriate partition can`t be found. */
+public class PartitionsNotFoundException extends DisasterRecoveryException {
+ private static final long serialVersionUID = -9215416423159317425L;
- LocalPartitionState(String tableName, int partitionId,
LocalPartitionStateEnum state) {
- this.tableName = tableName;
- this.partitionId = partitionId;
- this.state = state;
+ public PartitionsNotFoundException(Set<Integer> missingPartitionIds) {
+ super(PARTITIONS_NOT_FOUND_ERR, "Some partitions are missing: " +
missingPartitionIds);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
similarity index 59%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
index 0db3f39b09..fcf72bb698 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-/**
- * Local partition state.
- */
-public class LocalPartitionState {
- public final String tableName;
+import static
org.apache.ignite.lang.ErrorGroups.DistributionZones.ZONE_NOT_FOUND_ERR;
- public final int partitionId;
+import java.util.Set;
- public final LocalPartitionStateEnum state;
+/** Exception is thrown when appropriate node can`t be found. */
+public class ZonesNotFoundException extends DisasterRecoveryException {
+ private static final long serialVersionUID = -8475588176132321568L;
- LocalPartitionState(String tableName, int partitionId,
LocalPartitionStateEnum state) {
- this.tableName = tableName;
- this.partitionId = partitionId;
- this.state = state;
+ public ZonesNotFoundException(Set<String> missingZoneNames) {
+ super(ZONE_NOT_FOUND_ERR, "Some distribution zones are missing: " +
missingZoneNames);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesRequest.java
index 0f5472f7b6..a8599ba119 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionStatesRequest.java
@@ -17,16 +17,19 @@
package org.apache.ignite.internal.table.distributed.disaster.messages;
+import java.util.Set;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.table.distributed.TableMessageGroup.DisasterRecoveryMessages;
/**
- * Request for reading all partition states from the node.
+ * Request for reading partition states from the node.
*/
@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE_REQUEST)
public interface LocalPartitionStatesRequest extends NetworkMessage {
- int zoneId();
+ Set<Integer> zoneIds();
+
+ Set<Integer> partitionIds();
int catalogVersion();
}