This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9988d1a8fbac5a30b86c012dc8f7ca63c426242b Author: zstan <[email protected]> AuthorDate: Sun Oct 5 19:05:30 2025 +0300 fix --- .../sql/engine/statistic/ItStatisticTest.java | 8 +++--- .../internal/sql/engine/SqlQueryProcessor.java | 3 +- .../sql/engine/statistic/StatisticAggregator.java | 33 ++++++---------------- .../distributed/PartitionModificationCounter.java | 2 ++ 4 files changed, 15 insertions(+), 31 deletions(-) diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java index cf5d529c581..258f87c0105 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java @@ -39,20 +39,20 @@ import org.junit.jupiter.api.Test; public class ItStatisticTest extends BaseSqlIntegrationTest { private SqlStatisticManagerImpl sqlStatisticManager; - private static final String SINGLE_PART_ZONE = "zone_single_partition"; + private static final String TWO_PART_ZONE = "zone_single_partition"; @BeforeAll void beforeAll() { sqlStatisticManager = (SqlStatisticManagerImpl) queryProcessor().sqlStatisticManager(); - sql(format("CREATE ZONE {} (PARTITIONS 1, REPLICAS 1) storage profiles ['default'];", SINGLE_PART_ZONE)); - sql(format("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER) ZONE {}", SINGLE_PART_ZONE)); + sql(format("CREATE ZONE {} (PARTITIONS 2, REPLICAS 2) storage profiles ['default'];", TWO_PART_ZONE)); + sql(format("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER) ZONE {}", TWO_PART_ZONE)); } @AfterAll void afterAll() { sql("DROP TABLE IF EXISTS t;"); - sql(format("DROP ZONE IF EXISTS {}", SINGLE_PART_ZONE)); + sql(format("DROP ZONE IF EXISTS {}", TWO_PART_ZONE)); } @AfterEach diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 936372db7aa..eb42eceaf15 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -255,7 +255,7 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { this.killCommandHandler = killCommandHandler; this.eventLog = eventLog; - statAggregator = new StatisticAggregator(placementDriver, clockService::current, tableManager, clusterSrvc.topologyService()); + statAggregator = new StatisticAggregator(placementDriver, clockService::current); sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, catalogManager, lowWaterMark, commonScheduler, statAggregator); sqlSchemaManager = new SqlSchemaManagerImpl( catalogManager, @@ -312,7 +312,6 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { )); statAggregator.messaging(clusterSrvc.messagingService()); - statAggregator.nodeName(nodeName); var exchangeService = registerService(new ExchangeServiceImpl( mailboxRegistry, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java index 6f366d2d6c2..3dc87277285 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -4,53 +4,34 @@ import static java.util.concurrent.CompletableFuture.allOf; import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.util.HashSet; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.MessagingService; -import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsResponse; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.table.distributed.TableManager; import org.jetbrains.annotations.Nullable; public class StatisticAggregator { - private static final IgniteLogger LOG = Loggers.forClass(StatisticAggregator.class); private final PlacementDriver placementDriver; private final Supplier<HybridTimestamp> currentClock; private @Nullable MessagingService messagingService; - private @Nullable String nodeName; - private final TableManager tableManager; - private static final SqlQueryMessagesFactory MSG_FACTORY = new SqlQueryMessagesFactory(); private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory(); - private TopologyService topologyService; private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3); public StatisticAggregator( PlacementDriver placementDriver, - Supplier<HybridTimestamp> currentClock, - TableManager tableManager, - TopologyService topologyService + Supplier<HybridTimestamp> currentClock ) { this.placementDriver = placementDriver; this.currentClock = currentClock; - this.tableManager = tableManager; - this.topologyService = topologyService; - } - - public void nodeName(String nodeName) { - this.nodeName = nodeName; } public void messaging(MessagingService messagingService) { @@ -88,8 +69,8 @@ public class StatisticAggregator { .tableId(table.tableId()).build(); CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.stream() - .map(topologyService::getByConsistentId) - .filter(Objects::nonNull) + //.map(topologyService::getByConsistentId) + //.filter(Objects::nonNull) .map(node -> messagingService .invoke(node, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) .thenApply(response -> { @@ -106,9 +87,9 @@ public class StatisticAggregator { HybridTimestamp last = HybridTimestamp.MIN_VALUE; long count = 0L; - for (CompletableFuture<?> fut : invokeFutures) { - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut = - (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>) fut; + System.err.println("invokeFutures size: " + invokeFutures.length); + + for (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut : invokeFutures) { LongObjectImmutablePair<HybridTimestamp> result = requestFut.join(); if (last == null) { @@ -119,8 +100,10 @@ public class StatisticAggregator { } } count += result.keyLong(); + System.err.println("!!!!! requestFut " + result.keyLong()); } + System.err.println("!!!!! requestFut final:" + count); return LongObjectImmutablePair.of(count, last); }); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java index 9c6c1743369..46eb979e1cc 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java @@ -104,6 +104,8 @@ public class PartitionModificationCounter { ) { long estSize = estimateSize.getAsLong(); + System.err.println("!!! handleRequestCounter"); + if (tableId == message.tableId() && estSize != -1) { messagingService.respond( sender,
