This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 37f96f0ba92 IGNITE-27437 Introduce PartitionCounterProvider (#7324)
37f96f0ba92 is described below
commit 37f96f0ba92885b2cc79c7a8ee15dd2466125509
Author: Mikhail Efremov <[email protected]>
AuthorDate: Mon Feb 9 20:00:10 2026 +0600
IGNITE-27437 Introduce PartitionCounterProvider (#7324)
---
.../compaction/AbstractCatalogCompactionTest.java | 4 +-
.../internal/catalog/CatalogManagerImpl.java | 13 +-
.../catalog/CatalogParamsValidationUtils.java | 17 +++
.../PartitionCountCalculationParameters.java | 132 +++++++++++++++++++++
.../internal/catalog/PartitionCountProvider.java | 32 +++++
.../ignite/internal/catalog/UpdateContext.java | 13 ++
.../internal/catalog/commands/CatalogUtils.java | 17 ++-
.../catalog/commands/CreateTableCommand.java | 2 +-
.../catalog/commands/CreateZoneCommand.java | 31 ++++-
.../internal/catalog/CatalogManagerSelfTest.java | 16 ++-
.../internal/catalog/BaseCatalogManagerTest.java | 3 +-
.../ignite/internal/catalog/CatalogTestUtils.java | 27 ++++-
...niteDistributionZoneManagerNodeRestartTest.java | 4 +-
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../partition/replicator/fixtures/Node.java | 4 +-
.../PartitionReplicaLifecycleManagerTest.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
.../catalog/PartitionCountProviderWrapper.java | 45 +++++++
.../catalog/PartitionCountProviderWrapperTest.java | 118 ++++++++++++++++++
20 files changed, 469 insertions(+), 29 deletions(-)
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
index c62f0eb388f..81cdaabac1d 100644
---
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -84,7 +85,8 @@ abstract class AbstractCatalogCompactionTest extends
BaseIgniteAbstractTest {
new UpdateLogImpl(metastore, failureProcessor),
clockService,
failureProcessor,
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
assertThat(startAsync(new ComponentContext(), metastore),
willCompleteSuccessfully());
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index fec18f416db..f7fcbe32cd8 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -113,6 +113,11 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
*/
private final Object lastSaveUpdateFutureMutex = new Object();
+ /**
+ * Partition count provider for command update contexts.
+ */
+ private final PartitionCountProvider partitionCountProvider;
+
/**
* Constructor.
*/
@@ -120,13 +125,15 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
UpdateLog updateLog,
ClockService clockService,
FailureProcessor failureProcessor,
- LongSupplier delayDurationMsSupplier
+ LongSupplier delayDurationMsSupplier,
+ PartitionCountProvider partitionCountProvider
) {
this.updateLog = updateLog;
this.clockService = clockService;
this.failureProcessor = failureProcessor;
this.delayDurationMsSupplier = delayDurationMsSupplier;
this.catalogSystemViewProvider = new CatalogSystemViewRegistry(() ->
catalogAt(clockService.nowLong()));
+ this.partitionCountProvider = partitionCountProvider;
}
@Override
@@ -260,7 +267,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
var initialEntries = new ArrayList<UpdateEntry>();
- var context = new UpdateContext(emptyCatalog);
+ var context = new UpdateContext(emptyCatalog, partitionCountProvider);
for (UpdateProducer producer : initCommands) {
List<UpdateEntry> entries = producer.get(context);
@@ -413,7 +420,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
BitSet applyResults = new BitSet(updateProducers.size());
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();
try {
- UpdateContext updateContext = new UpdateContext(catalog);
+ UpdateContext updateContext = new UpdateContext(catalog,
partitionCountProvider);
for (int i = 0; i < updateProducers.size(); i++) {
UpdateProducer update = updateProducers.get(i);
List<UpdateEntry> entries = update.get(updateContext);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogParamsValidationUtils.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogParamsValidationUtils.java
index 01a94c68927..5c66c873564 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogParamsValidationUtils.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogParamsValidationUtils.java
@@ -103,6 +103,23 @@ public class CatalogParamsValidationUtils {
}
}
+ /**
+ * Validates correctness of the storage profiles.
+ */
+ public static void validateStorageProfileNames(List<String>
storageProfileNames) {
+ if (storageProfileNames == null) {
+ throw new CatalogValidationException(
+ "Storage profile cannot be null."
+ );
+ }
+
+ if (storageProfileNames.isEmpty()) {
+ throw new CatalogValidationException(
+ "Storage profile cannot be empty."
+ );
+ }
+ }
+
/**
* Validates that given consistency mode is has only the expected values.
*
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/PartitionCountCalculationParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/PartitionCountCalculationParameters.java
new file mode 100644
index 00000000000..2d1e9aaa6f7
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/PartitionCountCalculationParameters.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.Objects.requireNonNullElse;
+import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateField;
+import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateStorageProfileNames;
+import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateZoneFilter;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT;
+
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parameters container for {@link PartitionCountProvider#calculate}.
+ */
+public final class PartitionCountCalculationParameters {
+ private final String dataNodesFilter;
+
+ private final List<String> storageProfiles;
+
+ private final int replicaFactor;
+
+ private PartitionCountCalculationParameters(
+ String dataNodesFilter,
+ List<String> storageProfiles,
+ int replicaFactor
+ ) {
+ this.dataNodesFilter = dataNodesFilter;
+ this.storageProfiles = storageProfiles;
+ this.replicaFactor = replicaFactor;
+
+ validate();
+ }
+
+ private void validate() {
+ validateZoneFilter(dataNodesFilter);
+ validateStorageProfileNames(storageProfiles);
+ validateField(replicaFactor, 1, null, "Invalid number of replicas");
+ }
+
+ /** Returns data nodes filter. */
+ public String dataNodesFilter() {
+ return dataNodesFilter;
+ }
+
+ /** Returns storage profiles. */
+ public List<String> storageProfiles() {
+ return storageProfiles;
+ }
+
+ /** Returns replica factor. */
+ public int replicaFactor() {
+ return replicaFactor;
+ }
+
+ /** Creates new builder for parameters. */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Parameters builder. */
+ public static final class Builder {
+ /** Data nodes filter, {@code null} if not set. */
+ private @Nullable String dataNodesFilter;
+
+ /** Storage profiles, {@code null} if not set. */
+ private @Nullable List<String> storageProfiles;
+
+ /** Replica factor, {@code null} if not set. */
+ private @Nullable Integer replicaFactor;
+
+ /**
+ * Set data nodes filter.
+ *
+ * @param dataNodesFilter Data nodes filter.
+ * @return {@code this}.
+ */
+ public Builder dataNodesFilter(@Nullable String dataNodesFilter) {
+ this.dataNodesFilter = dataNodesFilter;
+ return this;
+ }
+
+ /**
+ * Set storage profiles.
+ *
+ * @param storageProfiles Storage profiles.
+ * @return {@code this}.
+ */
+ public Builder storageProfiles(@Nullable List<String> storageProfiles)
{
+ this.storageProfiles = storageProfiles;
+ return this;
+ }
+
+ /**
+ * Set replica factor.
+ *
+ * @param replicaFactor Replica factor.
+ * @return {@code this}.
+ */
+ public Builder replicaFactor(@Nullable Integer replicaFactor) {
+ this.replicaFactor = replicaFactor;
+ return this;
+ }
+
+ /** Builds parameters. */
+ public PartitionCountCalculationParameters build() {
+ return new PartitionCountCalculationParameters(
+ requireNonNullElse(dataNodesFilter, DEFAULT_FILTER),
+ requireNonNullElse(storageProfiles,
List.of(DEFAULT_STORAGE_PROFILE)),
+ requireNonNullElse(replicaFactor, DEFAULT_REPLICA_COUNT)
+ );
+ }
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/PartitionCountProvider.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/PartitionCountProvider.java
new file mode 100644
index 00000000000..7029cc53987
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/PartitionCountProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+
+/**
+ * Default partition count provider.
+ */
+@FunctionalInterface
+public interface PartitionCountProvider {
+ int calculate(PartitionCountCalculationParameters params);
+
+ static PartitionCountProvider defaultPartitionCountProvider() {
+ return params -> DEFAULT_PARTITION_COUNT;
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/UpdateContext.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/UpdateContext.java
index 1f5a5675208..3557ae055f5 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/UpdateContext.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/UpdateContext.java
@@ -34,10 +34,18 @@ public class UpdateContext {
/** The updatable catalog descriptor. */
private Catalog updatableCatalog;
+ private final PartitionCountProvider partitionCountProvider;
+
/** Constructor. */
public UpdateContext(Catalog catalog) {
+ this(catalog, PartitionCountProvider.defaultPartitionCountProvider());
+ }
+
+ /** Constructor. */
+ public UpdateContext(Catalog catalog, PartitionCountProvider
partitionCountProvider) {
this.baseCatalog = catalog;
this.updatableCatalog = catalog;
+ this.partitionCountProvider = partitionCountProvider;
}
/**
@@ -61,4 +69,9 @@ public class UpdateContext {
public void updateCatalog(Function<Catalog, Catalog> updater) {
updatableCatalog = updater.apply(updatableCatalog);
}
+
+ /** Returns partition count provider. */
+ public PartitionCountProvider partitionCountProvider() {
+ return partitionCountProvider;
+ }
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index e02ed7994c5..986c4308d13 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -40,6 +40,8 @@ import java.util.Set;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
+import org.apache.ignite.internal.catalog.PartitionCountCalculationParameters;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.commands.DefaultValue.FunctionCall;
import org.apache.ignite.internal.catalog.commands.DefaultValue.Type;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -589,13 +591,14 @@ public final class CatalogUtils {
*/
public static CatalogZoneDescriptor createDefaultZoneDescriptor(
Catalog catalog,
+ PartitionCountProvider partitionCountProvider,
int newDefaultZoneId,
Collection<UpdateEntry> updateEntries
) throws CatalogValidationException {
// TODO: Remove after
https://issues.apache.org/jira/browse/IGNITE-26798
checkDuplicateDefaultZoneName(catalog);
- CatalogZoneDescriptor defaultZone =
createDefaultZoneDescriptor(newDefaultZoneId);
+ CatalogZoneDescriptor defaultZone =
createDefaultZoneDescriptor(partitionCountProvider, newDefaultZoneId);
updateEntries.add(new NewZoneEntry(defaultZone));
updateEntries.add(new SetDefaultZoneEntry(defaultZone.id()));
@@ -603,11 +606,19 @@ public final class CatalogUtils {
return defaultZone;
}
- private static CatalogZoneDescriptor createDefaultZoneDescriptor(int
newDefaultZoneId) {
+ private static CatalogZoneDescriptor
createDefaultZoneDescriptor(PartitionCountProvider partitionCountProvider, int
newDefaultZoneId) {
+ PartitionCountCalculationParameters
partitionCountCalculationParameters =
PartitionCountCalculationParameters.builder()
+ .replicaFactor(DEFAULT_REPLICA_COUNT)
+ .dataNodesFilter(DEFAULT_FILTER)
+ .storageProfiles(List.of(DEFAULT_STORAGE_PROFILE))
+ .build();
+
+ int partitionCount =
partitionCountProvider.calculate(partitionCountCalculationParameters);
+
return new CatalogZoneDescriptor(
newDefaultZoneId,
DEFAULT_ZONE_NAME,
- DEFAULT_PARTITION_COUNT,
+ partitionCount,
DEFAULT_REPLICA_COUNT,
DEFAULT_ZONE_QUORUM_SIZE,
IMMEDIATE_TIMER_VALUE,
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index c0c573105ef..1162fd61f13 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -143,7 +143,7 @@ public class CreateTableCommand extends
AbstractTableCommand {
List<UpdateEntry> updateEntries = new ArrayList<>(5);
CatalogZoneDescriptor zone = shouldCreateNewDefaultZone(catalog,
zoneName)
- ? createDefaultZoneDescriptor(catalog, id++, updateEntries)
+ ? createDefaultZoneDescriptor(catalog,
updateContext.partitionCountProvider(), id++, updateEntries)
: zoneByNameOrDefaultOrThrow(catalog, zoneName);
assert zone != null;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateZoneCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateZoneCommand.java
index 84bdf0a905c..c3a4b9bd065 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateZoneCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateZoneCommand.java
@@ -21,12 +21,12 @@ import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.round;
import static java.util.Objects.requireNonNullElse;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateConsistencyMode;
import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateField;
import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateStorageProfiles;
import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateZoneFilter;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
-import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
@@ -40,7 +40,11 @@ import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.PartitionCountCalculationParameters;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.UpdateContext;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfilesDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.catalog.storage.NewZoneEntry;
@@ -130,7 +134,7 @@ public class CreateZoneCommand extends AbstractZoneCommand {
throw
duplicateDistributionZoneNameCatalogValidationException(zoneName);
}
- CatalogZoneDescriptor zoneDesc =
descriptor(catalog.objectIdGenState());
+ CatalogZoneDescriptor zoneDesc =
descriptor(updateContext.partitionCountProvider(), catalog.objectIdGenState());
return List.of(
new NewZoneEntry(zoneDesc),
@@ -138,13 +142,28 @@ public class CreateZoneCommand extends
AbstractZoneCommand {
);
}
- private CatalogZoneDescriptor descriptor(int objectId) {
+ private CatalogZoneDescriptor descriptor(PartitionCountProvider
partitionCountProvider, int objectId) {
+ String filter = requireNonNullElse(this.filter, DEFAULT_FILTER);
+
int replicas = requireNonNullElse(this.replicas,
DEFAULT_REPLICA_COUNT);
+ CatalogStorageProfilesDescriptor storageProfilesDescriptor =
fromParams(storageProfileParams);
+
+ List<String> storageProfiles = storageProfilesDescriptor.profiles()
+ .stream()
+ .map(CatalogStorageProfileDescriptor::storageProfile)
+ .collect(toList());
+
+ PartitionCountCalculationParameters
partitionCountCalculationParameters =
PartitionCountCalculationParameters.builder()
+ .dataNodesFilter(filter)
+ .storageProfiles(storageProfiles)
+ .replicaFactor(replicas)
+ .build();
+
return new CatalogZoneDescriptor(
objectId,
zoneName,
- requireNonNullElse(partitions, DEFAULT_PARTITION_COUNT),
+ requireNonNullElse(partitions,
partitionCountProvider.calculate(partitionCountCalculationParameters)),
replicas,
requireNonNullElse(quorumSize, defaultQuorumSize(replicas)),
requireNonNullElse(
@@ -152,8 +171,8 @@ public class CreateZoneCommand extends AbstractZoneCommand {
IMMEDIATE_TIMER_VALUE
),
requireNonNullElse(dataNodesAutoAdjustScaleDown,
INFINITE_TIMER_VALUE),
- requireNonNullElse(filter, DEFAULT_FILTER),
- fromParams(storageProfileParams),
+ filter,
+ storageProfilesDescriptor,
requireNonNullElse(consistencyMode, STRONG_CONSISTENCY)
);
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 257c63be789..50ecd0cc648 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -181,7 +181,13 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
when(updateLogMock.startAsync(componentContext)).thenReturn(nullCompletedFuture());
when(updateLogMock.append(any())).thenReturn(trueCompletedFuture());
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock,
clockService, new NoOpFailureManager(), delayDuration::get);
+ CatalogManagerImpl manager = new CatalogManagerImpl(
+ updateLogMock,
+ clockService,
+ new NoOpFailureManager(),
+ delayDuration::get,
+ PartitionCountProvider.defaultPartitionCountProvider()
+ );
assertThat(manager.startAsync(componentContext),
willCompleteSuccessfully());
reset(updateLogMock);
@@ -320,7 +326,13 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
when(updateLogMock.stopAsync(stopComponentContext)).thenReturn(nullCompletedFuture());
when(updateLogMock.append(any())).thenReturn(trueCompletedFuture());
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock,
clockService, new NoOpFailureManager(), delayDuration::get);
+ CatalogManagerImpl manager = new CatalogManagerImpl(
+ updateLogMock,
+ clockService,
+ new NoOpFailureManager(),
+ delayDuration::get,
+ PartitionCountProvider.defaultPartitionCountProvider()
+ );
assertThat(manager.startAsync(startComponentContext),
willCompleteSuccessfully());
diff --git
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
index 8bb06c8fe7f..387e11022b8 100644
---
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
+++
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
@@ -131,7 +131,8 @@ public abstract class BaseCatalogManagerTest extends
BaseIgniteAbstractTest {
updateLog,
clockService,
failureProcessor,
- delayDuration::get
+ delayDuration::get,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
ComponentContext context = new ComponentContext();
diff --git
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index ecec0f12643..1313a234195 100644
---
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -106,7 +106,14 @@ public class CatalogTestUtils {
FailureProcessor failureProcessor = new NoOpFailureManager();
UpdateLogImpl updateLog = new UpdateLogImpl(metastore,
failureProcessor);
- return new CatalogManagerImpl(updateLog, clockService,
failureProcessor, delayDurationMsSupplier) {
+
+ return new CatalogManagerImpl(
+ updateLog,
+ clockService,
+ failureProcessor,
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
+ ) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
assertThat(metastore.startAsync(componentContext),
willCompleteSuccessfully());
@@ -151,11 +158,13 @@ public class CatalogTestUtils {
StandaloneMetaStorageManager metastore =
StandaloneMetaStorageManager.create(nodeName);
FailureProcessor failureProcessor = mock(FailureProcessor.class);
+
return new CatalogManagerImpl(
new UpdateLogImpl(metastore, failureProcessor),
new TestClockService(clock, clockWaiter),
failureProcessor,
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
@@ -195,11 +204,13 @@ public class CatalogTestUtils {
HybridClock clock
) {
var failureProcessor = new NoOpFailureManager();
+
return new CatalogManagerImpl(
new UpdateLogImpl(metastore, failureProcessor),
new TestClockService(clock, clockWaiter),
failureProcessor,
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
}
@@ -231,11 +242,13 @@ public class CatalogTestUtils {
var clockWaiter = new ClockWaiter(nodeName, clock, scheduledExecutor);
var failureProcessor = new NoOpFailureManager();
+
return new CatalogManagerImpl(
new UpdateLogImpl(metastore, failureProcessor),
new TestClockService(clock, clockWaiter),
failureProcessor,
- delayDurationMsSupplier
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
@@ -306,7 +319,8 @@ public class CatalogTestUtils {
updateLog,
new TestClockService(clock, clockWaiter),
failureProcessor,
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
@@ -353,7 +367,8 @@ public class CatalogTestUtils {
new TestUpdateLog(clock),
new TestClockService(clock, clockWaiter),
new NoOpFailureManager(),
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index a2e0b6d4c2a..eb8b1b5f2ae 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -79,6 +79,7 @@ import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -304,7 +305,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
new UpdateLogImpl(metastore, failureProcessor),
clockService,
failureProcessor,
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
LowWatermark lowWatermark = mock(LowWatermark.class);
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 02b5cbc15c2..be7eefb9dbe 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -105,6 +105,7 @@ import
org.apache.ignite.configuration.validation.ConfigurationValidationExcepti
import org.apache.ignite.internal.app.ThreadPoolsManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
@@ -1496,7 +1497,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
new UpdateLogImpl(metaStorageManager, failureManager),
clockService,
failureManager,
- delayDurationMsSupplier
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
indexMetaStorage = new IndexMetaStorage(catalogManager,
lowWatermark, metaStorageManager);
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 7aeaa3351cf..bf006150022 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.app.NodePropertiesImpl;
import org.apache.ignite.internal.app.ThreadPoolsManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -666,7 +667,8 @@ public class Node {
new UpdateLogImpl(metaStorageManager, failureManager),
clockService,
failureManager,
- delayDurationMsSupplier
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
raftManager.appendEntriesRequestInterceptor(new
CheckCatalogVersionOnAppendEntries(catalogManager));
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 7d6b1e216e3..dbd8f483114 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -65,6 +65,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -219,7 +220,8 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
new UpdateLogImpl(metaStorageManager, failureManager),
clockService,
failureManager,
- () -> TEST_DELAY_DURATION
+ () -> TEST_DELAY_DURATION,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
replicaManager = spy(new ReplicaManager(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 22fb3e072e9..82c92acf1b2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.app.NodePropertiesImpl;
import org.apache.ignite.internal.app.ThreadPoolsManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.PartitionCountProvider;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
@@ -700,7 +701,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
new UpdateLogImpl(metaStorageMgr, failureProcessor),
clockService,
failureProcessor,
- delayDurationMsSupplier
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
);
var indexMetaStorage = new IndexMetaStorage(catalogManager,
lowWatermark, metaStorageMgr);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 33687ab6e40..9b510581afb 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -69,6 +69,7 @@ import org.apache.ignite.configuration.ConfigurationModule;
import org.apache.ignite.configuration.KeyIgnorer;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.PartitionCountProviderWrapper;
import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationExtensionConfiguration;
@@ -518,6 +519,8 @@ public class IgniteImpl implements Ignite {
private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
+ private final PartitionCountProviderWrapper partitionCountProviderWrapper;
+
/** Future that completes when the node has joined the cluster. */
private final CompletableFuture<Ignite> joinFuture = new
CompletableFuture<>();
@@ -870,11 +873,14 @@ public class IgniteImpl implements Ignite {
LongSupplier delayDurationMsSupplier =
delayDurationMsSupplier(schemaSyncConfig);
+ partitionCountProviderWrapper = new PartitionCountProviderWrapper();
+
CatalogManagerImpl catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr, failureManager),
clockService,
failureManager,
- delayDurationMsSupplier
+ delayDurationMsSupplier,
+ partitionCountProviderWrapper
);
ReplicationConfiguration replicationConfig = clusterConfigRegistry
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/catalog/PartitionCountProviderWrapper.java
b/modules/runner/src/main/java/org/apache/ignite/internal/catalog/PartitionCountProviderWrapper.java
new file mode 100644
index 00000000000..e6eb07bd5e8
--- /dev/null
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/catalog/PartitionCountProviderWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+/**
+ * Wrapper that allows to apply and use different partition count computing
approaches.
+ */
+public final class PartitionCountProviderWrapper implements
PartitionCountProvider {
+ /** Wrapped delegate to compute partition count. */
+ private volatile PartitionCountProvider delegate;
+
+ /** Constructor. */
+ public PartitionCountProviderWrapper(PartitionCountProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ /** Constructor that uses {@link
PartitionCountProvider#defaultPartitionCountProvider}. */
+ public PartitionCountProviderWrapper() {
+ this(PartitionCountProvider.defaultPartitionCountProvider());
+ }
+
+ @Override
+ public int calculate(PartitionCountCalculationParameters parameters) {
+ return delegate.calculate(parameters);
+ }
+
+ public void setPartitionCountProvider(PartitionCountProvider newProvider) {
+ this.delegate = newProvider;
+ }
+}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/catalog/PartitionCountProviderWrapperTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/catalog/PartitionCountProviderWrapperTest.java
new file mode 100644
index 00000000000..782e337a0a3
--- /dev/null
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/catalog/PartitionCountProviderWrapperTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link PartitionCountProviderWrapper}. */
+public class PartitionCountProviderWrapperTest extends BaseIgniteAbstractTest {
+ /**
+ * Tests that the default constructor uses the default partition count
provider.
+ */
+ @Test
+ void testDefaultConstructor() {
+ PartitionCountProviderWrapper wrapper = new
PartitionCountProviderWrapper();
+
+ PartitionCountCalculationParameters params =
PartitionCountCalculationParameters.builder().build();
+ int result = wrapper.calculate(params);
+
+ assertEquals(DEFAULT_PARTITION_COUNT, result);
+ }
+
+ /**
+ * Tests that the constructor with custom provider correctly wraps it.
+ */
+ @Test
+ void testConstructorWithCustomProvider() {
+ int expectedPartitionCount = 42;
+
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ PartitionCountProvider customProvider = params -> {
+ callCount.incrementAndGet();
+
+ return expectedPartitionCount;
+ };
+
+ PartitionCountProviderWrapper wrapper = new
PartitionCountProviderWrapper(customProvider);
+
+ PartitionCountCalculationParameters calculationParameters =
PartitionCountCalculationParameters.builder().build();
+ int result = wrapper.calculate(calculationParameters);
+
+ assertEquals(expectedPartitionCount, result);
+ assertNotEquals(expectedPartitionCount, DEFAULT_PARTITION_COUNT);
+ assertEquals(1, callCount.get());
+
+ }
+
+ /**
+ * Tests that calculate uses new provider after it has been changed
several times.
+ */
+ @Test
+ void testCalculateAfterProviderChange() {
+
+ PartitionCountProviderWrapper wrapper = new
PartitionCountProviderWrapper();
+ PartitionCountCalculationParameters calculationParameters =
PartitionCountCalculationParameters.builder().build();
+
+ assertEquals(DEFAULT_PARTITION_COUNT,
wrapper.calculate(calculationParameters));
+
+ int expectedFirstlyChangedPartitionCount = 10;
+ PartitionCountProvider firstChangeProvider = params ->
expectedFirstlyChangedPartitionCount;
+ wrapper.setPartitionCountProvider(firstChangeProvider);
+ assertEquals(expectedFirstlyChangedPartitionCount,
wrapper.calculate(calculationParameters));
+ assertNotEquals(expectedFirstlyChangedPartitionCount,
DEFAULT_PARTITION_COUNT);
+
+ int expectedLastlyChangedPartitionCount = 20;
+ PartitionCountProvider lastChangeProvider = params ->
expectedLastlyChangedPartitionCount;
+ wrapper.setPartitionCountProvider(lastChangeProvider);
+ assertEquals(expectedLastlyChangedPartitionCount,
wrapper.calculate(calculationParameters));
+ assertNotEquals(expectedFirstlyChangedPartitionCount,
DEFAULT_PARTITION_COUNT);
+ assertNotEquals(expectedFirstlyChangedPartitionCount,
expectedLastlyChangedPartitionCount);
+ }
+
+ /**
+ * Tests that calculate works correctly with different parameter
combinations.
+ */
+ @Test
+ void testCalculateWithDifferentParameters() {
+ PartitionCountProvider doubleProvider = params ->
params.replicaFactor() * 2;
+
+ PartitionCountProviderWrapper wrapper = new
PartitionCountProviderWrapper(doubleProvider);
+
+ PartitionCountCalculationParameters params1 =
PartitionCountCalculationParameters.builder()
+ .replicaFactor(2)
+ .build();
+ assertEquals(4, wrapper.calculate(params1));
+
+ PartitionCountCalculationParameters params2 =
PartitionCountCalculationParameters.builder()
+ .replicaFactor(5)
+ .build();
+ assertEquals(10, wrapper.calculate(params2));
+
+ PartitionCountCalculationParameters params3 =
PartitionCountCalculationParameters.builder()
+ .replicaFactor(10)
+ .build();
+ assertEquals(20, wrapper.calculate(params3));
+ }
+}