This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 320b67515ffe0769a7928185c15a3240b49ffdd2 Author: zstan <[email protected]> AuthorDate: Mon Oct 6 18:36:28 2025 +0300 fix --- .../org/apache/ignite/internal/app/IgniteImpl.java | 2 +- .../sql/engine/statistic/ItStatisticTest.java | 13 ++-- .../sql/engine/statistic/StatisticAggregator.java | 34 +++++----- .../distributed/PartitionModificationCounter.java | 70 +++++-------------- .../PartitionModificationCounterFactory.java | 22 +++--- .../PartitionModificationRequestHandler.java | 78 ++++++++++++++++++++++ .../internal/table/distributed/TableManager.java | 27 ++------ .../PartitionModificationCounterTest.java | 29 +++++--- .../ignite/internal/table/TableTestUtils.java | 13 ++-- 9 files changed, 157 insertions(+), 131 deletions(-) diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index cabc8e7437d..f973d36cf13 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -1114,7 +1114,7 @@ public class IgniteImpl implements Ignite { ); PartitionModificationCounterFactory partitionModificationCounterFactory = - new PartitionModificationCounterFactory(clockService::current); + new PartitionModificationCounterFactory(clockService::current, messagingServiceReturningToStorageOperationsPool); distributedTblMgr = new TableManager( name, diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java index 3c18899bbd5..ddba533b966 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java @@ -39,20 +39,21 @@ import org.junit.jupiter.api.Test; public class ItStatisticTest extends BaseSqlIntegrationTest { private SqlStatisticManagerImpl sqlStatisticManager; - private static final String TWO_PART_ZONE = "zone_single_partition"; + private static final String TWO_REPL_ZONE = "zone1"; @BeforeAll void beforeAll() { sqlStatisticManager = (SqlStatisticManagerImpl) queryProcessor().sqlStatisticManager(); - sql(format("CREATE ZONE {} (PARTITIONS 2, REPLICAS 2) storage profiles ['default'];", TWO_PART_ZONE)); - sql(format("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER) ZONE {}", TWO_PART_ZONE)); + sql(format("CREATE ZONE {} (REPLICAS 2) storage profiles ['default'];", TWO_REPL_ZONE)); + sql(format("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER) ZONE {}", TWO_REPL_ZONE)); + //sql("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER)"); } @AfterAll void afterAll() { sql("DROP TABLE IF EXISTS t;"); - sql(format("DROP ZONE IF EXISTS {}", TWO_PART_ZONE)); + sql(format("DROP ZONE IF EXISTS {}", TWO_REPL_ZONE)); } @AfterEach @@ -60,10 +61,10 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { sql("DELETE FROM t;"); } - @Override + /*@Override protected int initialNodes() { // change !!! return 1; - } + }*/ @Test public void testTableSizeUpdates() throws InterruptedException { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java index 10be0b2fe31..7d3f0345bd1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -5,7 +5,9 @@ import static java.util.concurrent.CompletableFuture.allOf; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -50,7 +52,7 @@ public class StatisticAggregator { int partitions = table.partitions(); - Int2ObjectMap<String> peers = new Int2ObjectOpenHashMap<>(); + Map<Integer, String> peers = new HashMap<>(); for (int p = 0; p < partitions; ++p) { ReplicaMeta repl = placementDriver.getCurrentPrimaryReplica( @@ -67,23 +69,25 @@ public class StatisticAggregator { return CompletableFuture.completedFuture(LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE)); } - GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() - .tableId(table.tableId()).build(); +/* GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() + .tableId(table.tableId()).build();*/ - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.stream() - //.map(topologyService::getByConsistentId) - //.filter(Objects::nonNull) - .map(node -> messagingService - .invoke(node, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) - .thenApply(response -> { - assert response instanceof GetEstimatedSizeWithLastModifiedTsResponse : response; + CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.entrySet().stream() + .map(ent -> { + GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() + .tableId(table.tableId()).partitionId(ent.getKey()).build(); - GetEstimatedSizeWithLastModifiedTsResponse response0 = (GetEstimatedSizeWithLastModifiedTsResponse) response; + return messagingService.invoke(ent.getValue(), request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) + .thenApply(response -> { + assert response instanceof GetEstimatedSizeWithLastModifiedTsResponse : response; - return LongObjectImmutablePair.of(response0.estimatedSize(), response0.ts()); - }) - .exceptionally(unused -> LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE))) - .toArray(CompletableFuture[]::new); + GetEstimatedSizeWithLastModifiedTsResponse response0 = (GetEstimatedSizeWithLastModifiedTsResponse) response; + + return LongObjectImmutablePair.of(response0.estimatedSize(), response0.ts()); + }) + .exceptionally(unused -> LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE)); + }) + .toArray(CompletableFuture[]::new);; return allOf(invokeFutures).thenApply(unused -> { HybridTimestamp last = HybridTimestamp.MIN_VALUE; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java index 46eb979e1cc..672d8db4b30 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java @@ -21,13 +21,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.MessagingService; -import org.apache.ignite.internal.network.NetworkMessage; -import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; -import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; -import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; -import org.jetbrains.annotations.Nullable; /** * Keeps track of the number of modifications of a partition. @@ -41,31 +35,28 @@ public class PartitionModificationCounter { private final LongSupplier partitionSizeSupplier; private final double staleRowsFraction; private final long minStaleRowsCount; - - int tableId; - int partitionId; - LongSupplier estimateSize; + private final PartitionModificationRequestHandler partitionModificationRequestHandler; private final AtomicLong counter = new AtomicLong(0); private volatile long nextMilestone; private volatile HybridTimestamp lastMilestoneReachedTimestamp; - private final MessagingService messagingService; - private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = - new PartitionReplicationMessagesFactory(); /** Constructor. */ public PartitionModificationCounter( - HybridTimestamp initTimestamp, - LongSupplier partitionSizeSupplier, - double staleRowsFraction, - long minStaleRowsCount, int tableId, int partitionId, MessagingService messagingService, - LongSupplier estimateSize + HybridTimestamp initTimestamp, + LongSupplier partitionSizeSupplier, + double staleRowsFraction, + long minStaleRowsCount ) { - Objects.requireNonNull(initTimestamp, "initTimestamp"); - Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier"); + lastMilestoneReachedTimestamp = Objects.requireNonNull(initTimestamp, "initTimestamp"); + this.partitionSizeSupplier = Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier"); + + this.partitionModificationRequestHandler = + new PartitionModificationRequestHandler(tableId, partitionId, messagingService, partitionSizeSupplier, + this::lastMilestoneTimestamp); if (staleRowsFraction < 0 || staleRowsFraction > 1) { throw new IllegalArgumentException("staleRowsFraction must be in [0, 1] range"); @@ -77,43 +68,8 @@ public class PartitionModificationCounter { this.staleRowsFraction = staleRowsFraction; this.minStaleRowsCount = minStaleRowsCount; - this.partitionSizeSupplier = partitionSizeSupplier; nextMilestone = computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction, minStaleRowsCount); - lastMilestoneReachedTimestamp = initTimestamp; - - this.messagingService = messagingService; - - messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage); - - this.tableId = tableId; - this.partitionId = partitionId; - this.estimateSize = estimateSize; - } - - private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { - if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) { - handleRequestCounter((GetEstimatedSizeWithLastModifiedTsRequest) message, sender, correlationId); - } - } - - private void handleRequestCounter( - GetEstimatedSizeWithLastModifiedTsRequest message, - InternalClusterNode sender, - @Nullable Long correlationId - ) { - long estSize = estimateSize.getAsLong(); - - System.err.println("!!! handleRequestCounter"); - - if (tableId == message.tableId() && estSize != -1) { - messagingService.respond( - sender, - PARTITION_REPLICATION_MESSAGES_FACTORY - .getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(estSize).ts(lastMilestoneTimestamp()).build(), - correlationId - ); - } } /** Returns the current counter value. */ @@ -168,4 +124,8 @@ public class PartitionModificationCounter { ) { return Math.max((long) (currentSize * staleRowsFraction), minStaleRowsCount); } + + private static class PartitionModificationRequestsHandler { + + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java index e6abeadde4a..cc67f2be0f0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -31,9 +32,11 @@ public class PartitionModificationCounterFactory { public static final double DEFAULT_STALE_ROWS_FRACTION = 0.2d; private final Supplier<HybridTimestamp> currentTimestampSupplier; + private final MessagingService messagingService; - public PartitionModificationCounterFactory(Supplier<HybridTimestamp> currentTimestampSupplier) { + public PartitionModificationCounterFactory(Supplier<HybridTimestamp> currentTimestampSupplier, MessagingService messagingService) { this.currentTimestampSupplier = currentTimestampSupplier; + this.messagingService = messagingService; } /** @@ -42,22 +45,15 @@ public class PartitionModificationCounterFactory { * @param partitionSizeSupplier Partition size supplier. * @return New partition modification counter. */ - public PartitionModificationCounter create( - int tableId, - int partitionId, - LongSupplier partitionSizeSupplier, - MessagingService messagingService, - LongSupplier estimateSize - ) { + public PartitionModificationCounter create(LongSupplier partitionSizeSupplier, int tableId, int partitionId) { return new PartitionModificationCounter( - currentTimestampSupplier.get(), - partitionSizeSupplier, - DEFAULT_STALE_ROWS_FRACTION, - DEFAULT_MIN_STALE_ROWS_COUNT, tableId, partitionId, messagingService, - estimateSize + currentTimestampSupplier.get(), + partitionSizeSupplier, + DEFAULT_STALE_ROWS_FRACTION, + DEFAULT_MIN_STALE_ROWS_COUNT ); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationRequestHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationRequestHandler.java new file mode 100644 index 00000000000..c5cf359bef1 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationRequestHandler.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.InternalClusterNode; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; +import org.jetbrains.annotations.Nullable; + +/** Process partition modification requests. */ +class PartitionModificationRequestHandler { + private final int tableId; + private final int partitionId; + private final MessagingService messagingService; + private final LongSupplier partitionSizeSupplier; + private final Supplier<HybridTimestamp> lastMilestoneTsConsumer; + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + PartitionModificationRequestHandler( + int tableId, + int partitionId, + MessagingService messagingService, + LongSupplier partitionSizeSupplier, + Supplier<HybridTimestamp> lastMilestoneTsConsumer + ) { + this.tableId = tableId; + this.partitionId = partitionId; + this.messagingService = messagingService; + this.partitionSizeSupplier = partitionSizeSupplier; + this.lastMilestoneTsConsumer = lastMilestoneTsConsumer; + + messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage); + } + + private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { + if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) { + handleRequestCounter((GetEstimatedSizeWithLastModifiedTsRequest) message, sender, correlationId); + } + } + + private void handleRequestCounter( + GetEstimatedSizeWithLastModifiedTsRequest message, + InternalClusterNode sender, + @Nullable Long correlationId + ) { + if (tableId == message.tableId() && partitionId == message.partitionId()) { + messagingService.respond( + sender, + PARTITION_REPLICATION_MESSAGES_FACTORY + .getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(partitionSizeSupplier.getAsLong()) + .ts(lastMilestoneTsConsumer.get()).build(), + correlationId + ); + } + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index d812c6e9383..23702916ead 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -95,7 +95,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; -import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -1042,19 +1041,12 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitionStorages.getMvPartitionStorage() ); - LongSupplier estSizeSupplier = () -> { - @Nullable MvPartitionStorage partition = table.internalTable().storage().getMvPartition(partId); - return partition == null ? -1 : partition.estimatedSize(); - }; - PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( partId, partitionDataStorage, table, safeTimeTracker, - replicationConfiguration, - messagingService0, - estSizeSupplier + replicationConfiguration ); internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); @@ -1400,19 +1392,12 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); - LongSupplier estSizeSupplier = () -> { - @Nullable MvPartitionStorage partition = table.internalTable().storage().getMvPartition(partId); - return partition == null ? -1 : partition.estimatedSize(); - }; - PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( partId, partitionDataStorage, table, safeTimeTracker, - replicationConfiguration, - messagingService0, - estSizeSupplier + replicationConfiguration ); internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); @@ -3169,9 +3154,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { PartitionDataStorage partitionDataStorage, TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, - ReplicationConfiguration replicationConfiguration, - MessagingService messagingService, - LongSupplier estimateSize + ReplicationConfiguration replicationConfiguration ) { TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); @@ -3181,8 +3164,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { LongSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize(); - PartitionModificationCounter modificationCounter = partitionModificationCounterFactory - .create(table.tableId(), partitionId, partSizeSupplier, messagingService, estimateSize); + PartitionModificationCounter modificationCounter = + partitionModificationCounterFactory.create(partSizeSupplier, table.tableId(), partitionId); registerPartitionModificationCounterMetrics(table.tableId(), partitionId, modificationCounter); StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java index 40c0e31ef85..0f9b0e8772a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java @@ -19,24 +19,33 @@ package org.apache.ignite.internal.table.distributed; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; /** * Tests for class {@link PartitionModificationCounter}. */ +@ExtendWith(MockitoExtension.class) public class PartitionModificationCounterTest extends BaseIgniteAbstractTest { + @Mock + private MessagingService messaging; + private final PartitionModificationCounterFactory factory = - new PartitionModificationCounterFactory(() -> HybridTimestamp.hybridTimestamp(1L)); + new PartitionModificationCounterFactory(() -> HybridTimestamp.hybridTimestamp(1L), mock(MessagingService.class)); @Test void initialValues() { // Empty table. { - PartitionModificationCounter counter = factory.create(() -> 0L); + PartitionModificationCounter counter = factory.create(() -> 0L, 0, 0); assertThat(counter.value(), is(0L)); assertThat(counter.nextMilestone(), is(PartitionModificationCounterFactory.DEFAULT_MIN_STALE_ROWS_COUNT)); @@ -45,7 +54,7 @@ public class PartitionModificationCounterTest extends BaseIgniteAbstractTest { // Table with 10k rows. { - PartitionModificationCounter counter = factory.create(() -> 10_000L); + PartitionModificationCounter counter = factory.create(() -> 10_000L, 0, 0); assertThat(counter.value(), is(0L)); assertThat(counter.nextMilestone(), is(2000L)); @@ -64,7 +73,7 @@ public class PartitionModificationCounterTest extends BaseIgniteAbstractTest { void lastMilestoneTimestampUpdate() { int rowsCount = 10_000; int threshold = (int) (rowsCount * PartitionModificationCounterFactory.DEFAULT_STALE_ROWS_FRACTION); - PartitionModificationCounter counter = factory.create(() -> rowsCount); + PartitionModificationCounter counter = factory.create(() -> rowsCount, 0, 0); assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L)); @@ -91,7 +100,7 @@ public class PartitionModificationCounterTest extends BaseIgniteAbstractTest { @Test @SuppressWarnings({"ThrowableNotThrown", "ResultOfObjectAllocationIgnored", "DataFlowIssue"}) void invalidUpdateValues() { - PartitionModificationCounter counter = factory.create(() -> 0L); + PartitionModificationCounter counter = factory.create(() -> 0L, 0, 0); IgniteTestUtils.assertThrows(NullPointerException.class, () -> counter.updateValue(1, null), "commitTimestamp"); @@ -104,31 +113,31 @@ public class PartitionModificationCounterTest extends BaseIgniteAbstractTest { IgniteTestUtils.assertThrows( NullPointerException.class, - () -> new PartitionModificationCounter(null, () -> 0L, 0.0d, 0), + () -> new PartitionModificationCounter(0, 0, messaging, null, () -> 0L, 0.0d, 0), "initTimestamp" ); IgniteTestUtils.assertThrows( NullPointerException.class, - () -> new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, null, 0.0d, 0), + () -> new PartitionModificationCounter(0, 0, messaging, HybridTimestamp.MIN_VALUE, null, 0.0d, 0), "partitionSizeSupplier" ); IgniteTestUtils.assertThrows( IllegalArgumentException.class, - () -> new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, 1.1d, 0), + () -> new PartitionModificationCounter(0, 0, messaging, HybridTimestamp.MIN_VALUE, () -> 0L, 1.1d, 0), "staleRowsFraction must be in [0, 1] range" ); IgniteTestUtils.assertThrows( IllegalArgumentException.class, - () -> new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, 0), + () -> new PartitionModificationCounter(0, 0, messaging, HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, 0), "staleRowsFraction must be in [0, 1] range" ); IgniteTestUtils.assertThrows( IllegalArgumentException.class, - () -> new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, -1), + () -> new PartitionModificationCounter(0, 0, messaging, HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, -1), "staleRowsFraction must be in [0, 1] range" ); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java index 15c89505bdf..1550e16cb73 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.sql.ColumnType.INT32; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; import java.util.List; import java.util.function.LongSupplier; @@ -67,19 +68,13 @@ public class TableTestUtils { /** No-op partition modification counter. */ public static final PartitionModificationCounter NOOP_PARTITION_MODIFICATION_COUNTER = - new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0, 0, 0, 0, 0, null, () -> 0L); + new PartitionModificationCounter(0, 0, mock(MessagingService.class), HybridTimestamp.MIN_VALUE, () -> 0, 0, 0); /** No-op partition modification counter factory. */ public static PartitionModificationCounterFactory NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY = - new PartitionModificationCounterFactory(() -> HybridTimestamp.MIN_VALUE) { + new PartitionModificationCounterFactory(() -> HybridTimestamp.MIN_VALUE, mock(MessagingService.class)) { @Override - public PartitionModificationCounter create( - int tableId, - int partitionId, - LongSupplier partitionSizeSupplier, - MessagingService messagingService, - LongSupplier estimateSize - ) { + public PartitionModificationCounter create(LongSupplier partitionSizeSupplier, int tableId, int partitionId) { return NOOP_PARTITION_MODIFICATION_COUNTER; } };
