This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ba4a8ec0a6d7e0b6bdd00e5c7d9b3371f2187042 Author: zstan <[email protected]> AuthorDate: Wed Oct 1 17:29:33 2025 +0300 wip --- .../distributionzones/DataNodesManager.java | 2 +- .../TableAwareReplicaRequestPreProcessor.java | 2 - .../network/PartitionReplicationMessageGroup.java | 6 -- .../GetEstimatedSizeWithLastModifiedTsRequest.java | 30 ------ .../internal/sql/engine/SqlQueryProcessor.java | 9 +- .../GetEstimatedSizeWithLastModifiedTsRequest.java | 10 ++ .../sql/engine/message/SqlQueryMessageGroup.java | 2 + .../engine/statistic/SqlStatisticManagerImpl.java | 6 +- .../sql/engine/statistic/StatisticAggregator.java | 115 +++++++++++++++++++++ .../ignite/internal/table/InternalTable.java | 9 -- .../internal/table/distributed/TableManager.java | 3 + .../replicator/PartitionReplicaListener.java | 5 - .../distributed/storage/InternalTableImpl.java | 31 +++++- 13 files changed, 173 insertions(+), 57 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java index cd3bad009bc..9c47bd6da55 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java @@ -848,7 +848,7 @@ public class DataNodesManager { } /** - * Returns data nodes for the given zone and timestamp. See {@link #dataNodes(int, HybridTimestamp)}. + * Returns data nodes for the given zone and timestamp. See {@link #dataNodes(int, HybridTimestamp, Integer)}. * Catalog version is calculated by the given timestamp. * * @param zoneId Zone ID. diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableAwareReplicaRequestPreProcessor.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableAwareReplicaRequestPreProcessor.java index 967606c3ae7..17f25b12c00 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableAwareReplicaRequestPreProcessor.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableAwareReplicaRequestPreProcessor.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest; -import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest; @@ -97,7 +96,6 @@ public class TableAwareReplicaRequestPreProcessor { assert txTs == null ? request instanceof GetEstimatedSizeRequest || request instanceof ScanCloseReplicaRequest - || request instanceof GetEstimatedSizeWithLastModifiedTsRequest || request instanceof BuildIndexReplicaRequest || request instanceof TableWriteIntentSwitchReplicaRequest : opTs.compareTo(txTs) >= 0 : "Invalid request timestamps [request=" + request + ']'; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java index d7bebbf6397..09c2f55e802 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.partition.replicator.network.replication.Binar import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest; -import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest; @@ -219,11 +218,6 @@ public interface PartitionReplicationMessageGroup { */ short CHANGE_PEERS_AND_LEARNERS_ASYNC_REPLICA_REQUEST = 28; - /** - * Message type for {@link GetEstimatedSizeWithLastModifiedTsRequest}. - */ - short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE = 51; - /** * Message types for partition replicator module RAFT commands. * diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java deleted file mode 100644 index 30d3eabb686..00000000000 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.partition.replicator.network.replication; - -import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; -import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; -import org.apache.ignite.internal.replicator.message.TableAware; - -/** - * Request for getting an estimated size of a partition with last modification timestamp. - */ -@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE) -public interface GetEstimatedSizeWithLastModifiedTsRequest extends PrimaryReplicaRequest, TableAware { -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 6f383c65244..5fce14f8488 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -97,6 +97,7 @@ import org.apache.ignite.internal.sql.engine.sql.ParsedResult; import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl; import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager; import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl; +import org.apache.ignite.internal.sql.engine.statistic.StatisticAggregator; import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext; import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl; import org.apache.ignite.internal.sql.engine.util.Commons; @@ -205,6 +206,8 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { private final EventLog eventLog; + private final StatisticAggregator statAggregator; + /** Constructor. */ public SqlQueryProcessor( ClusterService clusterSrvc, @@ -252,7 +255,8 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { this.killCommandHandler = killCommandHandler; this.eventLog = eventLog; - sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, catalogManager, lowWaterMark, commonScheduler); + statAggregator = new StatisticAggregator(placementDriver, clockService::current, tableManager); + sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, catalogManager, lowWaterMark, commonScheduler, statAggregator); sqlSchemaManager = new SqlSchemaManagerImpl( catalogManager, sqlStatisticManager, @@ -307,6 +311,9 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { clockService )); + statAggregator.messaging(clusterSrvc.messagingService()); + statAggregator.nodeName(nodeName); + var exchangeService = registerService(new ExchangeServiceImpl( mailboxRegistry, msgSrvc, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java new file mode 100644 index 00000000000..a9ed8d493ce --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java @@ -0,0 +1,10 @@ +package org.apache.ignite.internal.sql.engine.message; + +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; + +@Transferable(SqlQueryMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE) +public interface GetEstimatedSizeWithLastModifiedTsRequest extends NetworkMessage { + /** ID of the table. */ + int tableId(); +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java index ea0d1f1509a..8bbd77d0d7c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java @@ -46,4 +46,6 @@ public final class SqlQueryMessageGroup { /** See {@link CancelOperationResponse} for the details. */ public static final short OPERATION_CANCEL_RESPONSE = 7; + + public static final short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE = 8; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java index e07f355f634..048655ad5c8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java @@ -76,6 +76,8 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { Set<Integer> droppedTables = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ScheduledExecutorService scheduler; + private final StatisticAggregator statAggregator; + static final long INITIAL_DELAY = 5_000; static final long REFRESH_PERIOD = 5_000; @@ -84,12 +86,14 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { TableManager tableManager, CatalogService catalogService, LowWatermark lowWatermark, - ScheduledExecutorService scheduler + ScheduledExecutorService scheduler, + StatisticAggregator statAggregator ) { this.tableManager = tableManager; this.catalogService = catalogService; this.lowWatermark = lowWatermark; this.scheduler = scheduler; + this.statAggregator = statAggregator; } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java new file mode 100644 index 00000000000..eca0f32cfd0 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -0,0 +1,115 @@ +package org.apache.ignite.internal.sql.engine.statistic; + +import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.network.InternalClusterNode; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.sql.engine.message.GetEstimatedSizeWithLastModifiedTsRequest; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.lease.LeaseInfo; +import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.distributed.TableManager; +import org.jetbrains.annotations.Nullable; + +public class StatisticAggregator { + private static final IgniteLogger LOG = Loggers.forClass(StatisticAggregator.class); + private final PlacementDriver placementDriver; + private final Supplier<HybridTimestamp> currentClock; + private @Nullable MessagingService messagingService; + private @Nullable String nodeName; + private final TableManager tableManager; + private static final SqlQueryMessagesFactory MSG_FACTORY = new SqlQueryMessagesFactory(); + + public StatisticAggregator( + PlacementDriver placementDriver, + Supplier<HybridTimestamp> currentClock, + TableManager tableManager + ) { + this.placementDriver = placementDriver; + this.currentClock = currentClock; + this.tableManager = tableManager; + } + + public void nodeName(String nodeName) { + this.nodeName = nodeName; + } + + public void messaging(MessagingService messagingService) { + this.messagingService = messagingService; + + messagingService.addMessageHandler(SqlQueryMessageGroup.class, (message, sender, correlationId) -> { + if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) { + GetEstimatedSizeWithLastModifiedTsRequest msg = (GetEstimatedSizeWithLastModifiedTsRequest) message; + + TableViewInternal tableView = tableManager.cachedTable(msg.tableId()); + + if (tableView == null) { + LOG.debug("No table found to update statistics [id={}].", msg.tableId()); + + return; + } + + InternalTable table = tableView.internalTable(); + + for (int p = 0 ; p < table.partitions(); ++p) { + MvPartitionStorage mvPartition = table.storage().getMvPartition(p); + + if (mvPartition != null) { + LeaseInfo info = mvPartition.leaseInfo(); + + if (info != null) { + if (info.primaryReplicaNodeName().equals(nodeName)) { + mvPartition.estimatedSize(); + } + } + } + } + + storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, correlationId)); + } + }); + } + + private void onMessage(InternalClusterNode node, GetEstimatedSizeWithLastModifiedTsRequest msg) { + assert node != null && msg != null; + } + + /** + * Returns the pair<<em>last modification timestamp</em>, <em>estimated size</em>> of this table. + * + * @return Estimated size of this table with last modification timestamp. + */ + public LongObjectImmutablePair<HybridTimestamp> estimatedSizeWithLastUpdate(InternalTable table) { + int partitions = table.partitions(); + + Set<String> peers = new HashSet<>(); + + for (int p = 0; p < partitions; ++p) { + ReplicaMeta repl = placementDriver.getCurrentPrimaryReplica( + table.targetReplicationGroupId(p), currentClock.get()); + + if (repl != null) { + peers.add(repl.getLeaseholder()); + } else { + assert false; // !!! delete + } + } + + GetEstimatedSizeWithLastModifiedTsRequest request = MSG_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() + .tableId(table.tableId()).build(); + + for (String node : peers) { + messageService.send(node, request); + } + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index dde61214d3f..ca757a58232 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -540,15 +540,6 @@ public interface InternalTable extends ManuallyCloseable { */ CompletableFuture<Long> estimatedSize(); - /** - * Returns the pair<<em>last modification timestamp</em>, <em>estimated size</em>> of this table. - * - * @return Estimated size of this table with last modification timestamp. - * - * @see #estimatedSize - */ - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(); - /** * Returns the streamer receiver runner. * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index f48e53c6c4a..648c3c1354f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -1396,6 +1396,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { replicationConfiguration ); + partitionUpdateHandlers.storageUpdateHandler.lastModificationCounterMilestone(); + //partitionDataStorage.getStorage().estimatedSize(); + internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 82e3da097b9..a7a12560198 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -127,7 +127,6 @@ import org.apache.ignite.internal.partition.replicator.network.replication.Binar import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest; -import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest; @@ -592,10 +591,6 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr return processGetEstimatedSizeRequest(); } - if (request instanceof GetEstimatedSizeWithLastModifiedTsRequest) { - return processGetEstimatedSizeWithTsRequest(); - } - if (request instanceof ChangePeersAndLearnersAsyncReplicaRequest) { return processChangePeersAndLearnersReplicaRequest((ChangePeersAndLearnersAsyncReplicaRequest) request); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 3643ed10cac..febfbdd1169 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -2211,9 +2211,34 @@ public class InternalTableImpl implements InternalTable { return streamerFlushExecutor.get(); } +/* @Override public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate() { - HybridTimestamp now = clockService.current(); + //return CompletableFuture.completedFuture(null); + + Set<String> peers = new HashSet<>(); + + for (int partId = 0; partId < partitions; partId++) { + ReplicationGroupId replicaGroupId = targetReplicationGroupId(partId); + + ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(replicaGroupId, clockService.current()); + + if (meta != null) { + peers.add(meta.getLeaseholder()); + } else { + assert false; //!!! remove it + } + } + + GetEstimatedSizeWithLastModifiedTsRequest request = TABLE_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() + .tableId(tableId).build(); + + for (String leaseHolderName : peers) { + + } + +*/ +/* HybridTimestamp now = clockService.current(); var invokeFutures = new CompletableFuture<?>[partitions]; @@ -2250,8 +2275,10 @@ public class InternalTableImpl implements InternalTable { count += result.keyLong(); } return LongObjectImmutablePair.of(count, last); - }); + });*//* + } +*/ @Override public CompletableFuture<Long> estimatedSize() {
