This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-24211' in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 8355234a2ed7d3f4a35ff8aac3b96c56bc4c23c7 Author: amashenkov <[email protected]> AuthorDate: Mon Jan 13 21:29:05 2025 +0300 wip. Get rid of invalid methods in Catalog. --- .../apache/ignite/internal/catalog/Catalog.java | 18 ------- .../internal/catalog/CatalogManagerImpl.java | 15 +++++- .../ignite/internal/catalog/CatalogService.java | 23 +++++++- .../ignite/client/handler/FakeCatalogService.java | 7 ++- ...balanceMinimumRequiredTimeProviderImplTest.java | 12 ++--- .../rest/api/recovery/ResetPartitionsRequest.java | 13 ++++- .../api/recovery/RestartPartitionsRequest.java | 13 ++++- ...asterRecoveryControllerResetPartitionsTest.java | 6 +-- ...terRecoveryControllerRestartPartitionsTest.java | 18 +++---- .../recovery/ItDisasterRecoveryControllerTest.java | 12 +++-- .../rest/recovery/DisasterRecoveryController.java | 2 + .../disaster/DisasterRecoveryManager.java | 63 ++++++++++++++-------- .../disaster/ItDisasterRecoveryManagerTest.java | 3 +- .../ItDisasterRecoveryReconfigurationTest.java | 59 ++++++++++++-------- .../disaster/ItDisasterRecoverySystemViewTest.java | 2 +- 15 files changed, 172 insertions(+), 94 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java index fe342436b5..8eb8cb5729 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java @@ -66,7 +66,6 @@ public class Catalog { private final long activationTimestamp; private final Map<String, CatalogSchemaDescriptor> schemasByName; private final Map<String, CatalogZoneDescriptor> zonesByName; - private final Map<String, CatalogTableDescriptor> tablesByName; private final @Nullable CatalogZoneDescriptor defaultZone; @IgniteToStringExclude @@ -113,13 +112,6 @@ public class Catalog { schemasByName = schemas.stream().collect(toMapByName()); zonesByName = zones.stream().collect(toMapByName()); - tablesByName = new HashMap<>(); - for (CatalogSchemaDescriptor schema : schemas) { - for (CatalogTableDescriptor table : schema.tables()) { - tablesByName.put(schema.name() + "." + table.name(), table); - } - } - schemasById = schemas.stream().collect(toMapById()); tablesById = schemas.stream().flatMap(s -> Arrays.stream(s.tables())).collect(toMapById()); indexesById = schemas.stream().flatMap(s -> Arrays.stream(s.indexes())).collect(toMapById()); @@ -175,16 +167,6 @@ public class Catalog { return tablesByName.get(qualifiedTableName); } - /** - * Returns table descriptor by table name and schema name. - * - * @param schemaName Schema name. Case-sensitive, without quotes. - * @param tableName Table name without schema. Case-sensitive, without quotes. - * */ - public @Nullable CatalogTableDescriptor table(String schemaName, String tableName) { - return tablesByName.get(schemaName + "." + tableName); - } - public Collection<CatalogTableDescriptor> tables() { return tablesById.values(); } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index 3008862672..a7b6c1ed3c 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -196,8 +196,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata } @Override - public @Nullable CatalogTableDescriptor table(String tableName, long timestamp) { - CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(SqlCommon.DEFAULT_SCHEMA_NAME); + public @Nullable CatalogTableDescriptor table(String schemaName, String tableName, long timestamp) { + CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(schemaName); if (schema == null) { return null; } @@ -306,6 +306,17 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata return catalogAt(timestamp).version(); } + @Override + public long activationTime(int version) { + Catalog catalog = catalog(version); + + if (catalog == null) { + throw new IllegalArgumentException("Catalog version not found: " + version); + } + + return catalog.time(); + } + @Override public int earliestCatalogVersion() { return catalogByVer.firstEntry().getKey(); diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java index 4e91a28f04..c632c8936a 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.event.EventProducer; +import org.apache.ignite.internal.sql.SqlCommon; import org.jetbrains.annotations.Nullable; /** @@ -65,10 +66,26 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent @Nullable Catalog catalog(int catalogVersion); - @Nullable CatalogTableDescriptor table(String tableName, long timestamp); + /** + * Resolves table descriptor by the given schema name and table name, which is actual at the given timestamp. + * + * @return Table descriptor or {@code null} if table not found. + */ + @Nullable CatalogTableDescriptor table(String schemaName, String tableName, long timestamp); + + // TODO https://issues.apache.org/jira/browse/IGNITE-24029: Drop this method. Schema must be specified explicitly. + @Deprecated(forRemoval = true) + default @Nullable CatalogTableDescriptor table(String tableName, long timestamp) { + return table(SqlCommon.DEFAULT_SCHEMA_NAME, tableName, timestamp); + } @Nullable CatalogTableDescriptor table(int tableId, long timestamp); + /** + * Returns table descriptor by the given table ID and catalog version. + * + * @return Table descriptor or {@code null} if table not found. + */ @Nullable CatalogTableDescriptor table(int tableId, int catalogVersion); Collection<CatalogTableDescriptor> tables(int catalogVersion); @@ -107,8 +124,12 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent @Nullable CatalogSchemaDescriptor activeSchema(@Nullable String schemaName, long timestamp); + /** Gets actual catalog version for the given timestamp. */ int activeCatalogVersion(long timestamp); + /** Gets activation time for the given catalog version. */ + long activationTime(int catalogVersion); + /** Returns the earliest registered version of the catalog. */ int earliestCatalogVersion(); diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java index 03182e0ce2..403a7fc56d 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java @@ -52,7 +52,7 @@ public class FakeCatalogService implements CatalogService { } @Override - public CatalogTableDescriptor table(String tableName, long timestamp) { + public CatalogTableDescriptor table(String schemaName, String tableName, long timestamp) { return null; } @@ -158,6 +158,11 @@ public class FakeCatalogService implements CatalogService { return 0; } + @Override + public long activationTime(int catalogVersion) { + return 0L; + } + @Override public int earliestCatalogVersion() { return 0; diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java index 05884db372..6243f07333 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import org.apache.ignite.catalog.annotations.Table; import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.commands.CreateTableCommand; @@ -47,6 +46,7 @@ import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.partitiondistribution.Assignment; import org.apache.ignite.internal.partitiondistribution.Assignments; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.sql.ColumnType; @@ -58,6 +58,7 @@ import org.junit.jupiter.api.Test; * Test for {@link RebalanceMinimumRequiredTimeProviderImpl}. */ class RebalanceMinimumRequiredTimeProviderImplTest extends BaseDistributionZoneManagerTest { + private static final String SCHEMA_NAME = SqlCommon.DEFAULT_SCHEMA_NAME; private static final String TABLE_NAME = "tableName"; private static final String UPDATED_FILTER_1 = "$..*.*"; @@ -356,7 +357,7 @@ class RebalanceMinimumRequiredTimeProviderImplTest extends BaseDistributionZoneM private int createTable(String defaultZoneName) throws Exception { CompletableFuture<Integer> tableFuture = catalogManager.execute(CreateTableCommand.builder() .tableName(TABLE_NAME) - .schemaName(Table.DEFAULT_SCHEMA) + .schemaName(SCHEMA_NAME) .zone(defaultZoneName) .columns(List.of(ColumnParams.builder().name("key").type(ColumnType.INT32).build())) .primaryKey(TableHashPrimaryKey.builder().columns(List.of("key")).build()) @@ -367,10 +368,7 @@ class RebalanceMinimumRequiredTimeProviderImplTest extends BaseDistributionZoneM int catalogVersion = tableFuture.get(); - Catalog catalog = catalogManager.catalog(catalogVersion); - assertNotNull(catalog); - - CatalogTableDescriptor table = catalog.table(Table.DEFAULT_SCHEMA, TABLE_NAME); + CatalogTableDescriptor table = catalogManager.table(SCHEMA_NAME, TABLE_NAME, catalogVersion); assertNotNull(table); return table.id(); @@ -379,7 +377,7 @@ class RebalanceMinimumRequiredTimeProviderImplTest extends BaseDistributionZoneM private void dropTable(String tableName) { CompletableFuture<Integer> future = catalogManager.execute(DropTableCommand.builder() .tableName(tableName) - .schemaName(Table.DEFAULT_SCHEMA) + .schemaName(SCHEMA_NAME) .build() ); diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java index d8a45ac12e..373a22afdf 100644 --- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/ResetPartitionsRequest.java @@ -36,13 +36,17 @@ public class ResetPartitionsRequest { @Schema(description = "IDs of partitions to reset. All if empty.") private final Set<Integer> partitionIds; - @Schema(description = "Fully-qualified name of the table to reset partitions of. Without quotes, case-sensitive.") + @Schema(description = "Schema name. Without quotes, case-sensitive.") + private final String schemaName; + + @Schema(description = "Table name. Without quotes, case-sensitive.") private final String tableName; /** Constructor. */ @JsonCreator public ResetPartitionsRequest( @JsonProperty("zoneName") String zoneName, + @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("partitionIds") @Nullable Collection<Integer> partitionIds ) { @@ -50,6 +54,7 @@ public class ResetPartitionsRequest { Objects.requireNonNull(tableName); this.zoneName = zoneName; + this.schemaName = schemaName; this.tableName = tableName; this.partitionIds = partitionIds == null ? Set.of() : Set.copyOf(partitionIds); } @@ -66,6 +71,12 @@ public class ResetPartitionsRequest { return zoneName; } + /** Returns name of the schema the table belongs to. */ + @JsonGetter("schemaName") + public String schemaName() { + return schemaName; + } + /** Returns name of the table to reset partitions of. */ @JsonGetter("tableName") public String tableName() { diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/RestartPartitionsRequest.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/RestartPartitionsRequest.java index 7e53f9259e..180c87fb7b 100644 --- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/RestartPartitionsRequest.java +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/RestartPartitionsRequest.java @@ -40,14 +40,18 @@ public class RestartPartitionsRequest { @Schema(description = "IDs of partitions to restart. If empty/omitted, all partitions will be restarted.") private final Set<Integer> partitionIds; - @Schema(description = "Fully-qualified name of the table to restart partitions of. Without quotes, case-sensitive.") + @Schema(description = "Table name. Without quotes, case-sensitive.") private final String tableName; + @Schema(description = "Schema name. Without quotes, case-sensitive.") + private final String schemaName; + /** Constructor. */ @JsonCreator public RestartPartitionsRequest( @JsonProperty("nodeNames") @Nullable Set<String> nodeNames, @JsonProperty("zoneName") String zoneName, + @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("partitionIds") @Nullable Collection<Integer> partitionIds ) { @@ -55,6 +59,7 @@ public class RestartPartitionsRequest { Objects.requireNonNull(tableName); this.zoneName = zoneName; + this.schemaName = schemaName; this.tableName = tableName; this.partitionIds = partitionIds == null ? Set.of() : Set.copyOf(partitionIds); this.nodeNames = nodeNames == null ? Set.of() : Set.copyOf(nodeNames); @@ -84,6 +89,12 @@ public class RestartPartitionsRequest { return tableName; } + /** Returns name of the schema the table belongs to. */ + @JsonGetter("schemaName") + public String schemaName() { + return schemaName; + } + @Override public String toString() { return S.toString(this); diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java index 3f58a5c24d..b5994c7e19 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java @@ -46,7 +46,7 @@ public class ItDisasterRecoveryControllerResetPartitionsTest extends ClusterPerC private static final String TABLE_NAME = "first_ZONE_table"; - private static final String QUALIFIED_TABLE_NAME = "PUBLIC." + TABLE_NAME; + private static final String SCHEMA_NAME = "PUBLIC"; @Inject @Client(NODE_URL + "/management/v1/recovery/") @@ -61,7 +61,7 @@ public class ItDisasterRecoveryControllerResetPartitionsTest extends ClusterPerC @Test public void testResetAllPartitions() { MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST(RESET_PARTITIONS_ENDPOINT, - new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of())); + new ResetPartitionsRequest(FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of())); HttpResponse<Void> response = client.toBlocking().exchange(post); @@ -71,7 +71,7 @@ public class ItDisasterRecoveryControllerResetPartitionsTest extends ClusterPerC @Test public void testResetSpecifiedPartitions() { MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST(RESET_PARTITIONS_ENDPOINT, - new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0))); + new ResetPartitionsRequest(FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of(0))); HttpResponse<Void> response = client.toBlocking().exchange(post); diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java index 4c9995f78e..3ec08cc288 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java @@ -54,7 +54,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe private static final String TABLE_NAME = "first_ZONE_table"; - private static final String QUALIFIED_TABLE_NAME = "PUBLIC." + TABLE_NAME; + private static final String SCHEMA_NAME = "PUBLIC"; public static final String RESTART_PARTITIONS_ENDPOINT = "/partitions/restart"; @@ -74,7 +74,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe String unknownZone = "unknown_zone"; MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(Set.of(), unknownZone, QUALIFIED_TABLE_NAME, Set.of())); + new RestartPartitionsRequest(Set.of(), unknownZone, SCHEMA_NAME, TABLE_NAME, Set.of())); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -89,7 +89,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe String tableName = "unknown_table"; MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(Set.of(), FIRST_ZONE, tableName, Set.of())); + new RestartPartitionsRequest(Set.of(), FIRST_ZONE, SCHEMA_NAME, tableName, Set.of())); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -102,7 +102,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe @Test void testRestartPartitionsIllegalPartitionNegative() { MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(Set.of(), FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10))); + new RestartPartitionsRequest(Set.of(), FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of(0, 5, -1, -10))); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -115,7 +115,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe @Test void testRestartPartitionsPartitionsOutOfRange() { MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(Set.of(), FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT))); + new RestartPartitionsRequest(Set.of(), FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT))); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -138,7 +138,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe .collect(toSet()); MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(uppercaseNodeNames, FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of())); + new RestartPartitionsRequest(uppercaseNodeNames, FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of())); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -150,7 +150,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe @Test public void testRestartAllPartitions() { MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(Set.of(), FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of())); + new RestartPartitionsRequest(Set.of(), FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of())); HttpResponse<Void> response = client.toBlocking().exchange(post); @@ -160,7 +160,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe @Test public void testRestartSpecifiedPartitions() { MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(Set.of(), FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1))); + new RestartPartitionsRequest(Set.of(), FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of(0, 1))); HttpResponse<Void> response = client.toBlocking().exchange(post); @@ -172,7 +172,7 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe Set<String> nodeNames = nodeNames(initialNodes() - 1); MutableHttpRequest<RestartPartitionsRequest> post = HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT, - new RestartPartitionsRequest(nodeNames, FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of())); + new RestartPartitionsRequest(nodeNames, FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of())); HttpResponse<Void> response = client.toBlocking().exchange(post); diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java index f397708a02..b39c00dd70 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java @@ -70,7 +70,9 @@ public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegration private static final String FIRST_ZONE = "first_ZONE"; - private static final String QUALIFIED_TABLE_NAME = "PUBLIC.first_ZONE_table"; + private static final String SCHEMA_NAME = "PUBLIC"; + + private static final String TABLE_NAME = "first_ZONE_table"; private static final Set<String> ZONES = Set.of(FIRST_ZONE, "second_ZONE", "third_ZONE"); @@ -382,7 +384,7 @@ public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegration String unknownZone = "unknown_zone"; MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST(RESET_PARTITIONS_ENDPOINT, - new ResetPartitionsRequest(unknownZone, QUALIFIED_TABLE_NAME, Set.of())); + new ResetPartitionsRequest(unknownZone, SCHEMA_NAME, TABLE_NAME, Set.of())); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -397,7 +399,7 @@ public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegration String tableName = "unknown_table"; MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST(RESET_PARTITIONS_ENDPOINT, - new ResetPartitionsRequest(FIRST_ZONE, tableName, Set.of())); + new ResetPartitionsRequest(FIRST_ZONE, SCHEMA_NAME, tableName, Set.of())); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -410,7 +412,7 @@ public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegration @Test void testResetPartitionsIllegalPartitionNegative() { MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST(RESET_PARTITIONS_ENDPOINT, - new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10))); + new ResetPartitionsRequest(FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of(0, 5, -1, -10))); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); @@ -423,7 +425,7 @@ public class ItDisasterRecoveryControllerTest extends ClusterPerClassIntegration @Test void testResetPartitionsPartitionsOutOfRange() { MutableHttpRequest<ResetPartitionsRequest> post = HttpRequest.POST(RESET_PARTITIONS_ENDPOINT, - new ResetPartitionsRequest(FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT))); + new ResetPartitionsRequest(FIRST_ZONE, SCHEMA_NAME, TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT))); HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(post)); diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java index 52c7ce47ee..e01f32e670 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java @@ -86,6 +86,7 @@ public class DisasterRecoveryController implements DisasterRecoveryApi, Resource public CompletableFuture<Void> resetPartitions(@Body ResetPartitionsRequest command) { return disasterRecoveryManager.resetPartitions( command.zoneName(), + command.schemaName(), command.tableName(), command.partitionIds() ); @@ -96,6 +97,7 @@ public class DisasterRecoveryController implements DisasterRecoveryApi, Resource return disasterRecoveryManager.restartPartitions( command.nodeNames(), command.zoneName(), + command.schemaName(), command.tableName(), command.partitionIds() ); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java index 0308365f63..89e835b7cf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java @@ -119,7 +119,7 @@ import org.jetbrains.annotations.TestOnly; * Manager, responsible for "disaster recovery" operations. * Internally it triggers meta-storage updates, in order to acquire unique causality token. * As a reaction to these updates, manager performs actual recovery operations, - * such as {@link #resetPartitions(String, String, Set, boolean, long)}. + * such as {@link #resetPartitions(String, String, String, Set, boolean, long)}. * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>. */ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvider { @@ -310,13 +310,20 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi * so that a new leader could be elected. * * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. - * @param tableName Fully-qualified table name. Case-sensitive, without quotes. Example: "PUBLIC.Foo". + * @param schemaName Schema name. Case-sensitive, without quotes. + * @param tableName Table name. Case-sensitive, without quotes. * @param manualUpdate Whether the update is triggered manually by user or automatically by core logic. * @param triggerRevision Revision of the event, which produce this reset. -1 for manual reset. * @return Future that completes when partitions are reset. */ - public CompletableFuture<Void> resetAllPartitions(String zoneName, String tableName, boolean manualUpdate, long triggerRevision) { - return resetPartitions(zoneName, tableName, emptySet(), manualUpdate, triggerRevision); + public CompletableFuture<Void> resetAllPartitions( + String zoneName, + String schemaName, + String tableName, + boolean manualUpdate, + long triggerRevision + ) { + return resetPartitions(zoneName, schemaName, tableName, emptySet(), manualUpdate, triggerRevision); } /** @@ -326,12 +333,13 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi * so that a new leader could be elected. * * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. - * @param tableName Fully-qualified table name. Case-sensitive, without quotes. Example: "PUBLIC.Foo". + * @param schemaName Schema name. Case-sensitive, without quotes. + * @param tableName Table name. Case-sensitive, without quotes. * @param partitionIds IDs of partitions to reset. If empty, reset all zone's partitions. * @return Future that completes when partitions are reset. */ - public CompletableFuture<Void> resetPartitions(String zoneName, String tableName, Set<Integer> partitionIds) { - int tableId = tableDescriptor(catalogLatestVersion(), tableName).id(); + public CompletableFuture<Void> resetPartitions(String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) { + int tableId = tableDescriptor(schemaName, tableName, catalogManager.latestCatalogVersion()).id(); return resetPartitions(zoneName, Map.of(tableId, partitionIds), true, -1); } @@ -343,15 +351,22 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi * so that a new leader could be elected. * * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. - * @param tableName Fully-qualified table name. Case-sensitive, without quotes. Example: "PUBLIC.Foo". + * @param schemaName Schema name. Case-sensitive, without quotes. + * @param tableName Table name. Case-sensitive, without quotes. * @param partitionIds IDs of partitions to reset. If empty, reset all zone's partitions. * @param manualUpdate Whether the update is triggered manually by user or automatically by core logic. * @param triggerRevision Revision of the event, which produce this reset. -1 for manual reset. * @return Future that completes when partitions are reset. */ private CompletableFuture<Void> resetPartitions( - String zoneName, String tableName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) { - int tableId = tableDescriptor(catalogLatestVersion(), tableName).id(); + String zoneName, + String schemaName, + String tableName, + Set<Integer> partitionIds, + boolean manualUpdate, + long triggerRevision + ) { + int tableId = tableDescriptor(schemaName, tableName, catalogManager.latestCatalogVersion()).id(); return resetPartitions(zoneName, Map.of(tableId, partitionIds), manualUpdate, triggerRevision); } @@ -375,14 +390,14 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi long triggerRevision ) { try { - Catalog catalog = catalogLatestVersion(); + int catalogVersion = catalogManager.latestCatalogVersion(); - CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName); + CatalogZoneDescriptor zone = zoneDescriptor(zoneName, catalogVersion); partitionIds.values().forEach(ids -> checkPartitionsRange(ids, Set.of(zone))); return processNewRequest( - new GroupUpdateRequest(UUID.randomUUID(), catalog.version(), zone.id(), partitionIds, manualUpdate), + new GroupUpdateRequest(UUID.randomUUID(), catalogVersion, zone.id(), partitionIds, manualUpdate), triggerRevision ); } catch (Throwable t) { @@ -395,13 +410,15 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi * * @param nodeNames Names specifying nodes to restart partitions. Case-sensitive, empty set means "all nodes". * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. - * @param tableName Fully-qualified table name. Case-sensitive, without quotes. Example: "PUBLIC.Foo". + * @param schemaName Schema name. Case-sensitive, without quotes. + * @param tableName Table name. Case-sensitive, without quotes. * @param partitionIds IDs of partitions to restart. If empty, restart all zone's partitions. * @return Future that completes when partitions are restarted. */ public CompletableFuture<Void> restartPartitions( Set<String> nodeNames, String zoneName, + String schemaName, String tableName, Set<Integer> partitionIds ) { @@ -409,11 +426,11 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi // Validates passed node names. getNodes(nodeNames); - Catalog catalog = catalogLatestVersion(); + int catalogVersion = catalogManager.latestCatalogVersion(); - CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName); + CatalogZoneDescriptor zone = zoneDescriptor(zoneName, catalogVersion); - CatalogTableDescriptor table = tableDescriptor(catalog, tableName); + CatalogTableDescriptor table = tableDescriptor(schemaName, tableName, catalogVersion); checkPartitionsRange(partitionIds, Set.of(zone)); @@ -423,7 +440,7 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi table.id(), partitionIds, nodeNames, - catalog.time() + catalogManager.activationTime(catalogVersion) )); } catch (Throwable t) { return failedFuture(t); @@ -975,18 +992,18 @@ public class DisasterRecoveryManager implements IgniteComponent, SystemViewProvi return catalog; } - private static CatalogTableDescriptor tableDescriptor(Catalog catalog, String tableName) { - CatalogTableDescriptor tableDescriptor = catalog.table(tableName); + private CatalogTableDescriptor tableDescriptor(String schemaName, String tableName, int catalogVersion) { + CatalogTableDescriptor tableDescriptor = catalogManager.table(schemaName, tableName, catalogVersion); if (tableDescriptor == null) { - throw new TableNotFoundException(tableName); + throw new TableNotFoundException(schemaName, tableName); } return tableDescriptor; } - private static CatalogZoneDescriptor zoneDescriptor(Catalog catalog, String zoneName) { - CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneName); + private CatalogZoneDescriptor zoneDescriptor(String zoneName, int catalogVersion) { + CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneName, catalogVersion); if (zoneDescriptor == null) { throw new DistributionZoneNotFoundException(zoneName); diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java index e808f07708..f540c50bd0 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java @@ -77,7 +77,8 @@ public class ItDisasterRecoveryManagerTest extends ClusterPerTestIntegrationTest CompletableFuture<Void> restartPartitionsFuture = node.disasterRecoveryManager().restartPartitions( Set.of(node.name()), ZONE_NAME, - SqlCommon.DEFAULT_SCHEMA_NAME + "." + TABLE_NAME, + SqlCommon.DEFAULT_SCHEMA_NAME, + TABLE_NAME, Set.of(partitionId) ); diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java index e4d28e7b32..846336b6b1 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java @@ -138,7 +138,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra /** Test table name. */ private static final String TABLE_NAME = "TEST"; - private static final String QUALIFIED_TABLE_NAME = "PUBLIC.TEST"; + private static final String SCHEMA_NAME = "PUBLIC"; private static final int ENTRIES = 2; @@ -267,7 +267,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -323,7 +324,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, Set.of(anotherPartId) ); @@ -362,7 +364,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, 0 ); @@ -429,7 +432,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra // planned = [0, 3, 4] CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -578,7 +582,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra // Reset produces // pending = [1, force] // planned = [0, 1, 3] - CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager() + .resetAllPartitions(zoneName, SCHEMA_NAME, TABLE_NAME, true, -1); assertThat(resetFuture, willCompleteSuccessfully()); waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED); @@ -606,7 +611,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED); - resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, SCHEMA_NAME, TABLE_NAME, true, -1); assertThat(resetFuture, willCompleteSuccessfully()); waitForPartitionState(node0, partId, GlobalPartitionStateEnum.AVAILABLE); @@ -663,7 +668,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra // Reset produces // pending = [0, force] // planned = [0, 2, 3] - CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager() + .resetAllPartitions(zoneName, SCHEMA_NAME, TABLE_NAME, true, -1); assertThat(resetFuture, willCompleteSuccessfully()); waitForPartitionState(node0, partId, GlobalPartitionStateEnum.AVAILABLE); @@ -693,7 +699,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra ); blockRebalanceStableSwitch(partId, blockedRebalance3); - CompletableFuture<Void> resetFuture2 = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + CompletableFuture<Void> resetFuture2 = node0.disasterRecoveryManager() + .resetAllPartitions(zoneName, SCHEMA_NAME, TABLE_NAME, true, -1); assertThat(resetFuture2, willCompleteSuccessfully()); Assignments pendingAssignments = getPendingAssignments(node0, partId); @@ -729,7 +736,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra // Reset produces // pending = [0, force] // planned = [0, 3, 4] - CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager() + .resetAllPartitions(zoneName, SCHEMA_NAME, TABLE_NAME, true, -1); assertThat(resetFuture, willCompleteSuccessfully()); Assignments assignmentsPending = Assignments.forced(Set.of( @@ -783,7 +791,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, false, 1 ); @@ -833,7 +842,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, false, 1 ); @@ -902,7 +912,7 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra stopNode(4); CompletableFuture<Void> resetFuture = - node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, false, 1); + node0.disasterRecoveryManager().resetAllPartitions(zoneName, SCHEMA_NAME, TABLE_NAME, false, 1); assertThat(resetFuture, willCompleteSuccessfully()); // force == true, fromReset == false. @@ -1028,7 +1038,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -1161,7 +1172,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, false, 1 ); @@ -1221,7 +1233,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -1317,7 +1330,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -1477,7 +1491,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -1501,12 +1516,13 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra assertPendingAssignments(node0, partId, assignmentsPending); - logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{ 2})); + logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{2})); stopNode(2); CompletableFuture<?> updateFuture2 = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); @@ -1564,7 +1580,8 @@ public class ItDisasterRecoveryReconfigurationTest extends ClusterPerTestIntegra CompletableFuture<?> updateFuture2 = node0.disasterRecoveryManager().resetAllPartitions( zoneName, - QUALIFIED_TABLE_NAME, + SCHEMA_NAME, + TABLE_NAME, true, -1 ); diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java index f866225eed..fbf1d666df 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java @@ -186,7 +186,7 @@ public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest { private static int getTableId(String schemaName, String tableName) { CatalogManager catalogManager = unwrapIgniteImpl(CLUSTER.aliveNode()).catalogManager(); - return catalogManager.catalog(catalogManager.latestCatalogVersion()).table(schemaName, tableName).id(); + return catalogManager.table(schemaName, tableName, catalogManager.latestCatalogVersion()).id(); } private static long estimatedSize(String nodeName, String tableName, int partitionId) {
