This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit f79f36c484979361b69320ed37b8a04489b351d9 Author: zstan <[email protected]> AuthorDate: Tue Oct 7 12:40:44 2025 +0300 fix --- .../GetEstimatedSizeWithLastModifiedTsRequest.java | 1 + ...GetEstimatedSizeWithLastModifiedTsResponse.java | 5 +-- .../engine/statistic/SqlStatisticManagerImpl.java | 6 ++- .../sql/engine/statistic/StatisticAggregator.java | 45 +++++++++++----------- .../PartitionModificationCounterHandler.java | 2 +- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsRequest.java index db964733443..75a5bed50e5 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsRequest.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsRequest.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; +/** A message that queries a partition estimate size and last modification ts. */ @Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST) public interface GetEstimatedSizeWithLastModifiedTsRequest extends NetworkMessage { /** ID of the table. */ diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsResponse.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsResponse.java index b6aac4f6384..365d2aafb0d 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsResponse.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/message/GetEstimatedSizeWithLastModifiedTsResponse.java @@ -22,11 +22,10 @@ import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; +/** A response to the {@link GetEstimatedSizeWithLastModifiedTsRequest}. */ @Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE) public interface GetEstimatedSizeWithLastModifiedTsResponse extends NetworkMessage { - HybridTimestamp ts(); + HybridTimestamp lastModified(); long estimatedSize(); - - int partitionId(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java index e409945d19a..88209ef33c7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java @@ -129,9 +129,12 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { } private void update() { + System.err.println("!!!! update call"); for (Map.Entry<Integer, ActualSize> ent : tableSizeMap.entrySet()) { Integer tableId = ent.getKey(); + System.err.println("!!!! update call " + tableId); + if (droppedTables.contains(tableId)) { continue; } @@ -143,8 +146,6 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { continue; } - System.err.println("!!!call estimatedSizeWithLastUpdate " + tableView.internalTable().tableId()); - CompletableFuture<Void> updateResult = statAggregator.estimatedSizeWithLastUpdate(tableView.internalTable()) .thenAccept(res -> { // the table can be concurrently dropped and we shouldn't put new value in this case. @@ -157,6 +158,7 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { return new ActualSize(Math.max(res.keyLong(), DEFAULT_TABLE_SIZE), res.value()); }); }).exceptionally(e -> { + System.err.println("Can't calculate size for table: " + tableId); LOG.info("Can't calculate size for table [id={}].", e, tableId); return null; }); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java index 7d3f0345bd1..46e8470f756 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -1,25 +1,25 @@ package org.apache.ignite.internal.sql.engine.statistic; import static java.util.concurrent.CompletableFuture.allOf; +import static org.apache.ignite.internal.util.IgniteUtils.newHashMap; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsResponse; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.lang.ErrorGroups.Common; import org.jetbrains.annotations.Nullable; public class StatisticAggregator { @@ -47,45 +47,46 @@ public class StatisticAggregator { * * @return Estimated size of this table with last modification timestamp. */ - public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(InternalTable table) { + CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(InternalTable table) { assert messagingService != null; int partitions = table.partitions(); - Map<Integer, String> peers = new HashMap<>(); + Map<Integer, String> peers = newHashMap(partitions); for (int p = 0; p < partitions; ++p) { + ReplicationGroupId replicationGroupId = table.targetReplicationGroupId(p); + ReplicaMeta repl = placementDriver.getCurrentPrimaryReplica( - table.targetReplicationGroupId(p), currentClock.get()); + replicationGroupId, currentClock.get()); - if (repl != null) { + if (repl != null && repl.getLeaseholder() != null) { peers.put(p, repl.getLeaseholder()); } else { - //assert false; // !!! delete + System.err.println("!!!! Failed to get the primary replica"); + CompletableFuture.failedFuture(new IgniteInternalException(REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica" + + " [replicationGroupId=" + replicationGroupId + ']')); } } if (peers.isEmpty()) { - return CompletableFuture.completedFuture(LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE)); + throw new IgniteInternalException(Common.INTERNAL_ERR, "Table peers are not available" + + " [tableId=" + table.tableId() + ']'); } -/* GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() - .tableId(table.tableId()).build();*/ - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.entrySet().stream() .map(ent -> { GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() .tableId(table.tableId()).partitionId(ent.getKey()).build(); return messagingService.invoke(ent.getValue(), request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) - .thenApply(response -> { - assert response instanceof GetEstimatedSizeWithLastModifiedTsResponse : response; + .thenApply(networkMessage -> { + assert networkMessage instanceof GetEstimatedSizeWithLastModifiedTsResponse : networkMessage; - GetEstimatedSizeWithLastModifiedTsResponse response0 = (GetEstimatedSizeWithLastModifiedTsResponse) response; + GetEstimatedSizeWithLastModifiedTsResponse response = (GetEstimatedSizeWithLastModifiedTsResponse) networkMessage; - return LongObjectImmutablePair.of(response0.estimatedSize(), response0.ts()); - }) - .exceptionally(unused -> LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE)); + return LongObjectImmutablePair.of(response.estimatedSize(), response.lastModified()); + }); }) .toArray(CompletableFuture[]::new);; @@ -93,7 +94,7 @@ public class StatisticAggregator { HybridTimestamp last = HybridTimestamp.MIN_VALUE; long count = 0L; - System.err.println("invokeFutures size: " + invokeFutures.length); + System.err.println("!!!! invokeFutures " + invokeFutures.length); for (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut : invokeFutures) { LongObjectImmutablePair<HybridTimestamp> result = requestFut.join(); @@ -106,10 +107,8 @@ public class StatisticAggregator { } } count += result.keyLong(); - System.err.println("!!!!! requestFut " + result.keyLong()); } - System.err.println("!!!!! requestFut final:" + count); return LongObjectImmutablePair.of(count, last); }); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java index e3470ea84cc..de14d1d0e78 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java @@ -70,7 +70,7 @@ public class PartitionModificationCounterHandler { sender, PARTITION_REPLICATION_MESSAGES_FACTORY .getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(partitionSizeSupplier.getAsLong()) - .ts(lastMilestoneTimestamp()).build(), + .lastModified(lastMilestoneTimestamp()).build(), correlationId ); }
