This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 60de0fec9c71ba8094bb9780fc5a98265204acb3 Author: zstan <[email protected]> AuthorDate: Wed Oct 8 08:08:36 2025 +0300 styles --- .../ignite/client/fakes/FakeInternalTable.java | 6 - .../network/PartitionReplicationMessageGroup.java | 4 +- .../replication/GetEstimatedSizeRequest.java | 1 - .../sql/engine/statistic/ItStatisticTest.java | 21 +--- .../internal/sql/engine/SqlQueryProcessor.java | 9 +- .../engine/statistic/SqlStatisticManagerImpl.java | 29 +++-- .../sql/engine/statistic/StatisticAggregator.java | 134 ++++----------------- ...ggregator.java => StatisticAggregatorImpl.java} | 67 +++++++---- .../statistic/SqlStatisticManagerImplTest.java | 84 +++++++++---- .../ignite/internal/table/InternalTable.java | 1 - .../distributed/PartitionModificationCounter.java | 1 - .../PartitionModificationCounterHandler.java | 2 + .../replicator/PartitionReplicaListener.java | 1 - .../distributed/storage/InternalTableImpl.java | 70 ----------- 14 files changed, 155 insertions(+), 275 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index a0cf434150f..381b8de9bca 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -23,7 +23,6 @@ import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedF import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; -import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; @@ -543,11 +542,6 @@ public class FakeInternalTable implements InternalTable, StreamerReceiverRunner throw new IgniteInternalException(new OperationNotSupportedException()); } - @Override - public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate() { - throw new IgniteInternalException(new OperationNotSupportedException()); - } - @Override public StreamerReceiverRunner streamerReceiverRunner() { return this; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java index 1eac459b0ec..f5494cff6a4 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java @@ -38,6 +38,8 @@ import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar import org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage; import org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest; import org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse; +import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; +import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsResponse; import org.apache.ignite.internal.partition.replicator.network.message.HasDataRequest; import org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse; import org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta; @@ -53,8 +55,6 @@ import org.apache.ignite.internal.partition.replicator.network.replication.Binar import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest; -import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; -import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsResponse; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java index 72f36259955..de7de4b998e 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.partition.replicator.network.replication; -import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java index ddba533b966..146c4fc0e6b 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java @@ -39,21 +39,16 @@ import org.junit.jupiter.api.Test; public class ItStatisticTest extends BaseSqlIntegrationTest { private SqlStatisticManagerImpl sqlStatisticManager; - private static final String TWO_REPL_ZONE = "zone1"; - @BeforeAll void beforeAll() { sqlStatisticManager = (SqlStatisticManagerImpl) queryProcessor().sqlStatisticManager(); - sql(format("CREATE ZONE {} (REPLICAS 2) storage profiles ['default'];", TWO_REPL_ZONE)); - sql(format("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER) ZONE {}", TWO_REPL_ZONE)); - //sql("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER)"); + sql("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER)"); } @AfterAll void afterAll() { sql("DROP TABLE IF EXISTS t;"); - sql(format("DROP ZONE IF EXISTS {}", TWO_REPL_ZONE)); } @AfterEach @@ -61,18 +56,13 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { sql("DELETE FROM t;"); } - /*@Override - protected int initialNodes() { // change !!! - return 1; - }*/ - @Test public void testTableSizeUpdates() throws InterruptedException { long milestone1 = computeNextMilestone(0, DEFAULT_STALE_ROWS_FRACTION, DEFAULT_MIN_STALE_ROWS_COUNT); String selectQuery = "select * from t"; - insert(0, milestone1 - 1); + insert(0, milestone1); sql(selectQuery); @@ -98,7 +88,7 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { sql(selectQuery); - insert(0, milestone1 - 1); + insert(0, milestone1); sqlStatisticManager.forceUpdateAll(); sqlStatisticManager.lastUpdateStatisticFuture().join(); @@ -110,7 +100,7 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { long milestone2 = computeNextMilestone(milestone1, DEFAULT_STALE_ROWS_FRACTION, DEFAULT_MIN_STALE_ROWS_COUNT); - insert(milestone1, milestone1 + milestone2 - 1); + insert(milestone1, milestone1 + milestone2); sqlStatisticManager.forceUpdateAll(); sqlStatisticManager.lastUpdateStatisticFuture().join(); @@ -130,7 +120,8 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { return Math.max((long) (currentSize * staleRowsFraction), minStaleRowsCount); } + /** Inclusively 'from', exclusively 'to' bounds. */ private static void insert(long from, long to) { - sql("INSERT INTO t SELECT x, x FROM system_range(?, ?)", from, to); + sql("INSERT INTO t SELECT x, x FROM system_range(?, ?)", from, to - 1); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index eb42eceaf15..c4be6935a38 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -97,7 +97,7 @@ import org.apache.ignite.internal.sql.engine.sql.ParsedResult; import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl; import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager; import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl; -import org.apache.ignite.internal.sql.engine.statistic.StatisticAggregator; +import org.apache.ignite.internal.sql.engine.statistic.StatisticAggregatorImpl; import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext; import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl; import org.apache.ignite.internal.sql.engine.util.Commons; @@ -206,8 +206,6 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { private final EventLog eventLog; - private final StatisticAggregator statAggregator; - /** Constructor. */ public SqlQueryProcessor( ClusterService clusterSrvc, @@ -255,7 +253,8 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { this.killCommandHandler = killCommandHandler; this.eventLog = eventLog; - statAggregator = new StatisticAggregator(placementDriver, clockService::current); + StatisticAggregatorImpl statAggregator = + new StatisticAggregatorImpl(placementDriver, clockService::current, clusterSrvc.messagingService()); sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, catalogManager, lowWaterMark, commonScheduler, statAggregator); sqlSchemaManager = new SqlSchemaManagerImpl( catalogManager, @@ -311,8 +310,6 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { clockService )); - statAggregator.messaging(clusterSrvc.messagingService()); - var exchangeService = registerService(new ExchangeServiceImpl( mailboxRegistry, msgSrvc, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java index 88209ef33c7..45b0bbd76e8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.statistic; import static org.apache.ignite.internal.event.EventListener.fromConsumer; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.lowwatermark.LowWatermark; import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters; import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent; +import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.LongPriorityQueue; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.table.distributed.TableManager; @@ -76,10 +78,10 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { Set<Integer> droppedTables = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ScheduledExecutorService scheduler; - private final StatisticAggregator statAggregator; + private final StatisticAggregator<InternalTable, CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>> statSupplier; static final long INITIAL_DELAY = 5_000; - static final long REFRESH_PERIOD = 5_000; + static final long REFRESH_PERIOD = 20_000; /** Constructor. */ public SqlStatisticManagerImpl( @@ -87,17 +89,17 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { CatalogService catalogService, LowWatermark lowWatermark, ScheduledExecutorService scheduler, - StatisticAggregator statAggregator + StatisticAggregator<InternalTable, CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>> statSupplier ) { this.tableManager = tableManager; this.catalogService = catalogService; this.lowWatermark = lowWatermark; this.scheduler = scheduler; - this.statAggregator = statAggregator; + this.statSupplier = statSupplier; } /** - * Returns approximate number of rows in table by their id. + * Returns approximate number of rows in table. * * <p>Returns the previous known value or {@value SqlStatisticManagerImpl#DEFAULT_TABLE_SIZE} as default value. Can start process to * update asked statistics in background to have updated values for future requests. @@ -129,12 +131,13 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { } private void update() { - System.err.println("!!!! update call"); + if (!latestUpdateFut.get().isDone()) { + return; + } + for (Map.Entry<Integer, ActualSize> ent : tableSizeMap.entrySet()) { Integer tableId = ent.getKey(); - System.err.println("!!!! update call " + tableId); - if (droppedTables.contains(tableId)) { continue; } @@ -146,7 +149,11 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { continue; } - CompletableFuture<Void> updateResult = statAggregator.estimatedSizeWithLastUpdate(tableView.internalTable()) + CompletableFuture<Void> updateResult = statSupplier.estimatedSizeWithLastUpdate(tableView.internalTable()) + .exceptionally(e -> { + LOG.debug("Can't calculate size for table [id={}].", e, tableId); + return null; + }) .thenAccept(res -> { // the table can be concurrently dropped and we shouldn't put new value in this case. tableSizeMap.computeIfPresent(tableId, (k, v) -> { @@ -157,10 +164,6 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { return new ActualSize(Math.max(res.keyLong(), DEFAULT_TABLE_SIZE), res.value()); }); - }).exceptionally(e -> { - System.err.println("Can't calculate size for table: " + tableId); - LOG.info("Can't calculate size for table [id={}].", e, tableId); - return null; }); latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult : prev.thenCompose(none -> updateResult)); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java index 46e8470f756..5a72dab336c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -1,115 +1,25 @@ -package org.apache.ignite.internal.sql.engine.statistic; - -import static java.util.concurrent.CompletableFuture.allOf; -import static org.apache.ignite.internal.util.IgniteUtils.newHashMap; -import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; - -import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.network.MessagingService; -import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; -import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; -import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsResponse; -import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.lang.ErrorGroups.Common; -import org.jetbrains.annotations.Nullable; - -public class StatisticAggregator { - private final PlacementDriver placementDriver; - private final Supplier<HybridTimestamp> currentClock; - private @Nullable MessagingService messagingService; - private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = - new PartitionReplicationMessagesFactory(); - private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3); - - public StatisticAggregator( - PlacementDriver placementDriver, - Supplier<HybridTimestamp> currentClock - ) { - this.placementDriver = placementDriver; - this.currentClock = currentClock; - } - - public void messaging(MessagingService messagingService) { - this.messagingService = messagingService; - } - - /** - * Returns the pair<<em>last modification timestamp</em>, <em>estimated size</em>> of this table. - * - * @return Estimated size of this table with last modification timestamp. - */ - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(InternalTable table) { - assert messagingService != null; - - int partitions = table.partitions(); - - Map<Integer, String> peers = newHashMap(partitions); - - for (int p = 0; p < partitions; ++p) { - ReplicationGroupId replicationGroupId = table.targetReplicationGroupId(p); - - ReplicaMeta repl = placementDriver.getCurrentPrimaryReplica( - replicationGroupId, currentClock.get()); +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ - if (repl != null && repl.getLeaseholder() != null) { - peers.put(p, repl.getLeaseholder()); - } else { - System.err.println("!!!! Failed to get the primary replica"); - CompletableFuture.failedFuture(new IgniteInternalException(REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica" - + " [replicationGroupId=" + replicationGroupId + ']')); - } - } - - if (peers.isEmpty()) { - throw new IgniteInternalException(Common.INTERNAL_ERR, "Table peers are not available" - + " [tableId=" + table.tableId() + ']'); - } - - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.entrySet().stream() - .map(ent -> { - GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() - .tableId(table.tableId()).partitionId(ent.getKey()).build(); - - return messagingService.invoke(ent.getValue(), request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) - .thenApply(networkMessage -> { - assert networkMessage instanceof GetEstimatedSizeWithLastModifiedTsResponse : networkMessage; - - GetEstimatedSizeWithLastModifiedTsResponse response = (GetEstimatedSizeWithLastModifiedTsResponse) networkMessage; - - return LongObjectImmutablePair.of(response.estimatedSize(), response.lastModified()); - }); - }) - .toArray(CompletableFuture[]::new);; - - return allOf(invokeFutures).thenApply(unused -> { - HybridTimestamp last = HybridTimestamp.MIN_VALUE; - long count = 0L; - - System.err.println("!!!! invokeFutures " + invokeFutures.length); - - for (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut : invokeFutures) { - LongObjectImmutablePair<HybridTimestamp> result = requestFut.join(); - - if (last == null) { - last = result.value(); - } else { - if (result.value().compareTo(last) > 0) { - last = result.value(); - } - } - count += result.keyLong(); - } +package org.apache.ignite.internal.sql.engine.statistic; - return LongObjectImmutablePair.of(count, last); - }); - } +/** Statistic aggregator. */ +@FunctionalInterface +public interface StatisticAggregator<T, R> { + /** Estimated size and last value update. */ + R estimatedSizeWithLastUpdate(T t); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java similarity index 64% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java index 46e8470f756..de0bd3917ff 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.sql.engine.statistic; import static java.util.concurrent.CompletableFuture.allOf; @@ -20,25 +37,25 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.lang.ErrorGroups.Common; -import org.jetbrains.annotations.Nullable; -public class StatisticAggregator { +/** Statistic aggregator. */ +public class StatisticAggregatorImpl implements + StatisticAggregator<InternalTable, CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>> { private final PlacementDriver placementDriver; private final Supplier<HybridTimestamp> currentClock; - private @Nullable MessagingService messagingService; + private final MessagingService messagingService; private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory(); private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3); - public StatisticAggregator( + /** Constructor. */ + public StatisticAggregatorImpl( PlacementDriver placementDriver, - Supplier<HybridTimestamp> currentClock + Supplier<HybridTimestamp> currentClock, + MessagingService messagingService ) { this.placementDriver = placementDriver; this.currentClock = currentClock; - } - - public void messaging(MessagingService messagingService) { this.messagingService = messagingService; } @@ -47,7 +64,8 @@ public class StatisticAggregator { * * @return Estimated size of this table with last modification timestamp. */ - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(InternalTable table) { + @Override + public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(InternalTable table) { assert messagingService != null; int partitions = table.partitions(); @@ -63,27 +81,29 @@ public class StatisticAggregator { if (repl != null && repl.getLeaseholder() != null) { peers.put(p, repl.getLeaseholder()); } else { - System.err.println("!!!! Failed to get the primary replica"); - CompletableFuture.failedFuture(new IgniteInternalException(REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica" + return CompletableFuture.failedFuture( + new IgniteInternalException(REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica" + " [replicationGroupId=" + replicationGroupId + ']')); } } if (peers.isEmpty()) { - throw new IgniteInternalException(Common.INTERNAL_ERR, "Table peers are not available" - + " [tableId=" + table.tableId() + ']'); + return CompletableFuture.failedFuture(new IgniteInternalException(Common.INTERNAL_ERR, "Table peers are not available" + + " [tableId=" + table.tableId() + ']')); } CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.entrySet().stream() .map(ent -> { - GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() + GetEstimatedSizeWithLastModifiedTsRequest request = + PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() .tableId(table.tableId()).partitionId(ent.getKey()).build(); return messagingService.invoke(ent.getValue(), request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) .thenApply(networkMessage -> { assert networkMessage instanceof GetEstimatedSizeWithLastModifiedTsResponse : networkMessage; - GetEstimatedSizeWithLastModifiedTsResponse response = (GetEstimatedSizeWithLastModifiedTsResponse) networkMessage; + GetEstimatedSizeWithLastModifiedTsResponse response + = (GetEstimatedSizeWithLastModifiedTsResponse) networkMessage; return LongObjectImmutablePair.of(response.estimatedSize(), response.lastModified()); }); @@ -94,19 +114,14 @@ public class StatisticAggregator { HybridTimestamp last = HybridTimestamp.MIN_VALUE; long count = 0L; - System.err.println("!!!! invokeFutures " + invokeFutures.length); - for (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut : invokeFutures) { - LongObjectImmutablePair<HybridTimestamp> result = requestFut.join(); - - if (last == null) { - last = result.value(); - } else { - if (result.value().compareTo(last) > 0) { - last = result.value(); - } + LongObjectImmutablePair<HybridTimestamp> partitionState = requestFut.join(); + + if (partitionState.value().compareTo(last) > 0) { + last = partitionState.value(); } - count += result.keyLong(); + + count += partitionState.keyLong(); } return LongObjectImmutablePair.of(count, last); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java index cd7cd58d8ba..0882f6a4698 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java @@ -55,8 +55,8 @@ import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; -import org.awaitility.Awaitility; import org.apache.ignite.sql.ColumnType; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -84,6 +84,9 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { @Mock private LowWatermark lowWatermark; + @Mock + StatisticAggregatorImpl statAggregator; + @InjectExecutorService private ScheduledExecutorService commonExecutor; @@ -97,7 +100,7 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { when(catalogManager.catalog(anyInt())).thenReturn(mock(Catalog.class)); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); // Test: @@ -114,11 +117,11 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal); when(tableViewInternal.internalTable()).thenReturn(internalTable); - when(internalTable.estimatedSizeWithLastUpdate()) + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)) .thenReturn(CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize, HybridTimestamp.MAX_VALUE))); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY); @@ -130,7 +133,7 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { // Second time we should obtain the same value from a cache. assertEquals(tableSize, sqlStatisticManager.tableSize(tableId)); - verify(internalTable, times(1)).estimatedSizeWithLastUpdate(); + verify(statAggregator, times(1)).estimatedSizeWithLastUpdate(internalTable); } @Test @@ -147,14 +150,14 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal); when(tableViewInternal.internalTable()).thenReturn(internalTable); - when(internalTable.estimatedSizeWithLastUpdate()) + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)) .thenReturn( CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize1, time1)), CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize2, time2)) ); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY); @@ -171,7 +174,7 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId)); assertEquals(time2, sqlStatisticManager.tableSizeMap.get(tableId).getTimestamp()); - verify(internalTable, times(2)).estimatedSizeWithLastUpdate(); + verify(statAggregator, times(2)).estimatedSizeWithLastUpdate(internalTable); } @Test @@ -213,11 +216,11 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { when(tableManager.cachedTable(anyInt())).thenReturn(tableViewInternal); when(tableViewInternal.internalTable()).thenReturn(internalTable); - when(internalTable.estimatedSizeWithLastUpdate()) + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)) .thenReturn(CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize, HybridTimestamp.MAX_VALUE))); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); sqlStatisticManager.forceUpdateAll(); @@ -253,11 +256,11 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal); when(tableViewInternal.internalTable()).thenReturn(internalTable); - when(internalTable.estimatedSizeWithLastUpdate()) + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)) .thenReturn(CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize, HybridTimestamp.MAX_VALUE))); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY); @@ -298,11 +301,11 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal); when(tableViewInternal.internalTable()).thenReturn(internalTable); - when(internalTable.estimatedSizeWithLastUpdate()) + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)) .thenReturn(CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize, HybridTimestamp.MAX_VALUE))); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); // Test: @@ -328,6 +331,48 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { ); } + @Test + void statisticUpdatesIfEstimationPartiallyUnavailable() { + int tableId = ThreadLocalRandom.current().nextInt(); + long tableSize1 = 100_000L; + long tableSize2 = 500_000L; + + HybridTimestamp time1 = HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000); + HybridTimestamp time2 = HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500); + // Preparing: + prepareCatalogWithTable(tableId); + + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)).thenReturn( + CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize1, time1)), // stale result goes first + CompletableFuture.failedFuture(new RuntimeException("smth wrong")), + CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize2, time2)) + ); + + when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal); + when(tableViewInternal.internalTable()).thenReturn(internalTable); + + SqlStatisticManagerImpl sqlStatisticManager = + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); + + sqlStatisticManager.start(); + // tableSize1 + sqlStatisticManager.forceUpdateAll(); + + assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId)); + + // err call + sqlStatisticManager.forceUpdateAll(); + + assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId)); + + // tableSize2 + sqlStatisticManager.forceUpdateAll(); + + assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId)); + + verify(statAggregator, times(3)).estimatedSizeWithLastUpdate(internalTable); + } + @Test void ensureStaleRequestsAreDiscarded() { int tableId = ThreadLocalRandom.current().nextInt(); @@ -339,20 +384,19 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { // Preparing: prepareCatalogWithTable(tableId); - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> staleResultFuture = new CompletableFuture<>(); when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal); when(tableViewInternal.internalTable()).thenReturn(internalTable); - when(internalTable.estimatedSizeWithLastUpdate()).thenReturn( - staleResultFuture, // stale result goes first + when(statAggregator.estimatedSizeWithLastUpdate(internalTable)).thenReturn( + CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize1, time1)), // stale result goes first CompletableFuture.completedFuture(LongObjectImmutablePair.of(tableSize2, time2)) ); SqlStatisticManagerImpl sqlStatisticManager = - new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor); + new SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark, commonExecutor, statAggregator); sqlStatisticManager.start(); // Test: - // first call results in non-completed future, therefore default size is expected + // first call results in non-completed future, therefore default size is expected assertEquals(DEFAULT_TABLE_SIZE, sqlStatisticManager.tableSize(tableId)); // get stale @@ -366,8 +410,6 @@ class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest { assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId)) ); - staleResultFuture.complete(LongObjectImmutablePair.of(tableSize1, time1)); - // Stale result must be discarded. assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId)); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index ca757a58232..44bdbadf5c9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table; -import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.util.BitSet; import java.util.Collection; import java.util.List; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java index e8f0f1e6966..832d7a78675 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java @@ -21,7 +21,6 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.network.MessagingService; /** * Keeps track of the number of modifications of a partition. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java index de14d1d0e78..ea92b41990f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterHandler.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.partition.replicator.network.PartitionReplicat import org.apache.ignite.internal.partition.replicator.network.message.GetEstimatedSizeWithLastModifiedTsRequest; import org.jetbrains.annotations.Nullable; +/** Partition modification handler. */ public class PartitionModificationCounterHandler { private final PartitionModificationCounter modificationCounter; private final int tableId; @@ -36,6 +37,7 @@ public class PartitionModificationCounterHandler { private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory(); + /** Constructor. */ public PartitionModificationCounterHandler( int tableId, int partitionId, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 3ca00a2c7da..cfc7b675670 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -58,7 +58,6 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHE import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR; -import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index febfbdd1169..8f418d59651 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -63,7 +63,6 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRI import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -2211,75 +2210,6 @@ public class InternalTableImpl implements InternalTable { return streamerFlushExecutor.get(); } -/* - @Override - public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate() { - //return CompletableFuture.completedFuture(null); - - Set<String> peers = new HashSet<>(); - - for (int partId = 0; partId < partitions; partId++) { - ReplicationGroupId replicaGroupId = targetReplicationGroupId(partId); - - ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(replicaGroupId, clockService.current()); - - if (meta != null) { - peers.add(meta.getLeaseholder()); - } else { - assert false; //!!! remove it - } - } - - GetEstimatedSizeWithLastModifiedTsRequest request = TABLE_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() - .tableId(tableId).build(); - - for (String leaseHolderName : peers) { - - } - -*/ -/* HybridTimestamp now = clockService.current(); - - var invokeFutures = new CompletableFuture<?>[partitions]; - - for (int partId = 0; partId < partitions; partId++) { - ReplicationGroupId replicaGroupId = targetReplicationGroupId(partId); - ReplicationGroupIdMessage partitionIdMessage = serializeReplicationGroupId(replicaGroupId); - - Function<ReplicaMeta, ReplicaRequest> requestFactory = replicaMeta -> - TABLE_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() - .groupId(partitionIdMessage) - .tableId(tableId) - .enlistmentConsistencyToken(enlistmentConsistencyToken(replicaMeta)) - .timestamp(now) - .build(); - - invokeFutures[partId] = sendToPrimaryWithRetry(replicaGroupId, now, 5, requestFactory); - } - - return allOf(invokeFutures).thenApply(v -> { - HybridTimestamp last = null; - long count = 0L; - for (CompletableFuture<?> fut : invokeFutures) { - CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut = - (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>) fut; - LongObjectImmutablePair<HybridTimestamp> result = requestFut.join(); - - if (last == null) { - last = result.value(); - } else { - if (result.value().compareTo(last) > 0) { - last = result.value(); - } - } - count += result.keyLong(); - } - return LongObjectImmutablePair.of(count, last); - });*//* - - } -*/ - @Override public CompletableFuture<Long> estimatedSize() { HybridTimestamp now = clockService.current();
