This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 51ab2d350b171920bd6af72497e38e69cf8b06a7 Author: zstan <[email protected]> AuthorDate: Sat Oct 4 20:27:01 2025 +0300 test ok --- .../engine/statistic/SqlStatisticManagerImpl.java | 18 ------- .../sql/engine/statistic/StatisticAggregator.java | 61 ---------------------- .../distributed/PartitionModificationCounter.java | 59 +++++++-------------- .../PartitionModificationCounterFactory.java | 6 ++- .../internal/table/distributed/TableManager.java | 21 ++++++-- .../ignite/internal/table/TableTestUtils.java | 5 +- 6 files changed, 43 insertions(+), 127 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java index abd8f3e80f9..a342485080a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.statistic; import static org.apache.ignite.internal.event.EventListener.fromConsumer; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -107,23 +106,6 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { */ @Override public long tableSize(int tableId) { -/* TableViewInternal tableView = tableManager.cachedTable(tableId); - - if (tableView == null) { - return 1; - } - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> updateResult = statAggregator.estimatedSizeWithLastUpdate( - tableView.internalTable()); - - updateResult.join();*/ - return tableSizeMap.computeIfAbsent(tableId, k -> DEFAULT_VALUE).getSize(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java index 003560f7008..f20dc4c9d6e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -63,49 +63,6 @@ public class StatisticAggregator { public void messaging(MessagingService messagingService) { this.messagingService = messagingService; - - messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage); - -/* messagingService.addMessageHandler(SqlQueryMessageGroup.class, (message, sender, correlationId) -> { - if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) { - GetEstimatedSizeWithLastModifiedTsRequest msg = (GetEstimatedSizeWithLastModifiedTsRequest) message; - - TableViewInternal tableView = tableManager.cachedTable(msg.tableId()); - - if (tableView == null) { - LOG.debug("No table found to update statistics [id={}].", msg.tableId()); - - return; - } - - InternalTable table = tableView.internalTable(); - - for (int p = 0 ; p < table.partitions(); ++p) { - MvPartitionStorage mvPartition = table.storage().getMvPartition(p); - - if (mvPartition != null) { - LeaseInfo info = mvPartition.leaseInfo(); - - if (info != null) { - if (info.primaryReplicaNodeName().equals(nodeName)) { - mvPartition.estimatedSize(); - } - } - } - } - - storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, correlationId)); - } - });*/ - - //InternalClusterNode localNode = clusterSrvc.topologyService().localMember(); - //String nodeName = localNode.name(); - } - - private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { - if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse) { - System.err.println("!!!!"); - } } /** @@ -138,24 +95,6 @@ public class StatisticAggregator { GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() .tableId(table.tableId()).build(); -/* for (String p : peers) { - @Nullable InternalClusterNode cons = topologyService.getByConsistentId(p); - if (cons == null) - continue; - - CompletableFuture<NetworkMessage> fut = messagingService.invoke(cons, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS); - - fut.thenApply(res -> { - System.err.println(); - return null; - }).exceptionally(ex -> { - System.err.println(); - return null; - }); - - fut.join(); - }*/ - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.stream() .map(topologyService::getByConsistentId) .filter(Objects::nonNull) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java index c18ccad923d..9c6cd8994d1 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java @@ -43,6 +43,10 @@ public class PartitionModificationCounter { private final double staleRowsFraction; private final long minStaleRowsCount; + int tableId; + int partitionId; + LongSupplier estimateSize; + private final AtomicLong counter = new AtomicLong(0); private volatile long nextMilestone; private volatile HybridTimestamp lastMilestoneReachedTimestamp; @@ -58,7 +62,8 @@ public class PartitionModificationCounter { long minStaleRowsCount, int tableId, int partitionId, - MessagingService messagingService + MessagingService messagingService, + LongSupplier estimateSize ) { Objects.requireNonNull(initTimestamp, "initTimestamp"); Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier"); @@ -81,6 +86,10 @@ public class PartitionModificationCounter { this.messagingService = messagingService; messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage); + + this.tableId = tableId; + this.partitionId = partitionId; + this.estimateSize = estimateSize; } private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { @@ -94,48 +103,18 @@ public class PartitionModificationCounter { InternalClusterNode sender, @Nullable Long correlationId ) { - GetEstimatedSizeWithLastModifiedTsRequest msg = (GetEstimatedSizeWithLastModifiedTsRequest) message; - - CompletableFuture<Void> fut = messagingService.respond( - sender, - PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(100).ts(HybridTimestamp.MAX_VALUE).build(), - correlationId - ); - - System.err.println(); - -/* messagingService.respond( - sender, - PARTITION_REPLICATION_MESSAGES_FACTORY.hasDataResponse().build(), - correlationId - );*/ - - -/* TableViewInternal tableView = tableManager.cachedTable(msg.tableId()); + long estSize = estimateSize.getAsLong(); - if (tableView == null) { - LOG.debug("No table found to update statistics [id={}].", msg.tableId()); + if (tableId == message.tableId() && estSize != -1) { + System.err.println("!!!!! send size: " + estSize); - return; + messagingService.respond( + sender, + PARTITION_REPLICATION_MESSAGES_FACTORY + .getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(estSize).ts(lastMilestoneTimestamp()).build(), + correlationId + ); } - - InternalTable table = tableView.internalTable(); - - for (int p = 0 ; p < table.partitions(); ++p) { - MvPartitionStorage mvPartition = table.storage().getMvPartition(p); - - if (mvPartition != null) { - LeaseInfo info = mvPartition.leaseInfo(); - - if (info != null) { - if (info.primaryReplicaNodeName().equals(nodeName)) { - mvPartition.estimatedSize(); - } - } - } - } - - storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, correlationId));*/ } /** Returns the current counter value. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java index ef506461751..e6abeadde4a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java @@ -46,7 +46,8 @@ public class PartitionModificationCounterFactory { int tableId, int partitionId, LongSupplier partitionSizeSupplier, - MessagingService messagingService + MessagingService messagingService, + LongSupplier estimateSize ) { return new PartitionModificationCounter( currentTimestampSupplier.get(), @@ -55,7 +56,8 @@ public class PartitionModificationCounterFactory { DEFAULT_MIN_STALE_ROWS_COUNT, tableId, partitionId, - messagingService + messagingService, + estimateSize ); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 75df7c230b6..9887115d124 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -1041,13 +1041,19 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitionStorages.getMvPartitionStorage() ); + LongSupplier estSizeSupplier = () -> { + @Nullable MvPartitionStorage partition = table.internalTable().storage().getMvPartition(partId); + return partition == null ? -1 : partition.estimatedSize(); + }; + PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( partId, partitionDataStorage, table, safeTimeTracker, replicationConfiguration, - messagingService0 + messagingService0, + estSizeSupplier ); internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); @@ -1393,13 +1399,19 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); + LongSupplier estSizeSupplier = () -> { + @Nullable MvPartitionStorage partition = table.internalTable().storage().getMvPartition(partId); + return partition == null ? -1 : partition.estimatedSize(); + }; + PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( partId, partitionDataStorage, table, safeTimeTracker, replicationConfiguration, - messagingService0 + messagingService0, + estSizeSupplier ); internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); @@ -3157,7 +3169,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, ReplicationConfiguration replicationConfiguration, - MessagingService messagingService + MessagingService messagingService, + LongSupplier estimateSize ) { TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); @@ -3167,7 +3180,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { LongSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize(); PartitionModificationCounter modificationCounter = partitionModificationCounterFactory - .create(table.tableId(), partitionId, partSizeSupplier, messagingService); + .create(table.tableId(), partitionId, partSizeSupplier, messagingService, estimateSize); registerPartitionModificationCounterMetrics(table.tableId(), partitionId, modificationCounter); StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java index decd56de798..15c89505bdf 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java @@ -67,7 +67,7 @@ public class TableTestUtils { /** No-op partition modification counter. */ public static final PartitionModificationCounter NOOP_PARTITION_MODIFICATION_COUNTER = - new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0, 0, 0, 0, 0, null); + new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0, 0, 0, 0, 0, null, () -> 0L); /** No-op partition modification counter factory. */ public static PartitionModificationCounterFactory NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY = @@ -77,7 +77,8 @@ public class TableTestUtils { int tableId, int partitionId, LongSupplier partitionSizeSupplier, - MessagingService messagingService + MessagingService messagingService, + LongSupplier estimateSize ) { return NOOP_PARTITION_MODIFICATION_COUNTER; }
