This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new dfe0a5d0a08 IGNITE-25863 Deduplicate awaitAssignmentsStabilization() (#6219) dfe0a5d0a08 is described below commit dfe0a5d0a08b413f6df4a6f1094dfff0ee2f5abc Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Wed Jul 9 14:49:26 2025 +0400 IGNITE-25863 Deduplicate awaitAssignmentsStabilization() (#6219) --- modules/catalog-dsl/build.gradle | 2 - .../catalog/ItCatalogApiThreadingTest.java | 44 +-------- modules/runner/build.gradle | 1 + .../ignite/internal/AssignmentsTestUtils.java | 108 +++++++++++++++++++++ .../internal/sql/engine/ItLimitOffsetTest.java | 58 ++--------- .../ignite/internal/tx/ItTxTimeoutOneNodeTest.java | 45 +-------- .../tx/readonly/ItReadOnlyTxInPastTest.java | 49 +--------- 7 files changed, 125 insertions(+), 182 deletions(-) diff --git a/modules/catalog-dsl/build.gradle b/modules/catalog-dsl/build.gradle index 41b4bc85c95..36350c2aa56 100644 --- a/modules/catalog-dsl/build.gradle +++ b/modules/catalog-dsl/build.gradle @@ -32,8 +32,6 @@ dependencies { integrationTestImplementation project(':ignite-api') integrationTestImplementation project(':ignite-catalog') integrationTestImplementation project(':ignite-system-view-api') - integrationTestImplementation project(':ignite-partition-distribution') - integrationTestImplementation project(':ignite-placement-driver-api') integrationTestImplementation testFixtures(project(':ignite-core')) integrationTestImplementation testFixtures(project(':ignite-runner')) } diff --git a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java index 3bb0044a1a8..72602364c9b 100644 --- a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java +++ b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java @@ -18,36 +18,26 @@ package org.apache.ignite.internal.catalog; import static java.lang.Thread.currentThread; +import static org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone; import static org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread; import static org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool; import static org.apache.ignite.internal.PublicApiThreadingTests.tryToSwitchFromUserThreadWithDelayedSchemaSync; import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; -import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; import static org.apache.ignite.internal.catalog.ItCatalogDslTest.POJO_RECORD_TABLE_NAME; import static org.apache.ignite.internal.catalog.ItCatalogDslTest.ZONE_NAME; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; import static org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; -import org.apache.ignite.Ignite; import org.apache.ignite.catalog.IgniteCatalog; import org.apache.ignite.catalog.definitions.TableDefinition; import org.apache.ignite.catalog.definitions.ZoneDefinition; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; -import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments; -import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.wrapper.Wrappers; import org.apache.ignite.table.Table; import org.junit.jupiter.api.AfterEach; @@ -65,7 +55,7 @@ class ItCatalogApiThreadingTest extends ClusterPerClassIntegrationTest { @BeforeAll void waitForDefaultZoneStabilization() throws InterruptedException { if (colocationEnabled()) { - awaitAssignmentsStabilization(node(0)); + awaitAssignmentsStabilizationOnDefaultZone(node(0)); } } @@ -75,36 +65,6 @@ class ItCatalogApiThreadingTest extends ClusterPerClassIntegrationTest { dropAllZonesExceptDefaultOne(); } - /** - * Returns a future that completes when the Default Zone's primary replicas have been elected. - */ - // TODO: remove this method after https://issues.apache.org/jira/browse/IGNITE-25283 has been fixed. - private static void awaitAssignmentsStabilization(Ignite node) throws InterruptedException { - IgniteImpl igniteImpl = unwrapIgniteImpl(node); - - Catalog catalog = igniteImpl.catalogManager().catalog(igniteImpl.catalogManager().latestCatalogVersion()); - - CatalogZoneDescriptor defaultZone = catalog.defaultZone(); - - assertTrue(waitForCondition(() -> { - HybridTimestamp timestamp = igniteImpl.clock().now(); - - int totalPartitionSize = 0; - - // Within given test default zone is used. - for (int p = 0; p < DEFAULT_PARTITION_COUNT; p++) { - CompletableFuture<TokenizedAssignments> assignmentsFuture = igniteImpl.placementDriver() - .getAssignments(new ZonePartitionId(defaultZone.id(), p), timestamp); - - assertThat(assignmentsFuture, willCompleteSuccessfully()); - - totalPartitionSize += assignmentsFuture.join().nodes().size(); - } - - return totalPartitionSize == defaultZone.partitions() * defaultZone.replicas(); - }, 10_000)); - } - @ParameterizedTest @EnumSource(CatalogAsyncOperation.class) void catalogFuturesCompleteInContinuationsPool(CatalogAsyncOperation operation) { diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle index b7d8db43530..6679535229a 100644 --- a/modules/runner/build.gradle +++ b/modules/runner/build.gradle @@ -219,6 +219,7 @@ dependencies { testFixturesImplementation project(':ignite-configuration') testFixturesImplementation project(':ignite-vault') testFixturesImplementation project(':ignite-placement-driver-api') + testFixturesImplementation project(':ignite-partition-distribution') testFixturesImplementation testFixtures(project(':ignite-api')) testFixturesImplementation testFixtures(project(':ignite-core')) testFixturesImplementation testFixtures(project(':ignite-network')) diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java new file mode 100644 index 00000000000..b32b92a0a8a --- /dev/null +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.catalog.Catalog; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.table.TableImpl; + +/** + * Utilities to work with assignments in integration tests. + */ +public class AssignmentsTestUtils { + /** + * Waits for assignments stabilization: that is, for all partitions to get primary replicas at least once. + * + * @param node Node via which to operate. + * @param tableName Name of the table which assignments (or which zone's assignments) to wait for. + */ + public static void awaitAssignmentsStabilization(Ignite node, String tableName) throws InterruptedException { + IgniteImpl igniteImpl = unwrapIgniteImpl(node); + TableImpl table = unwrapTableImpl(node.tables().table(tableName)); + + Catalog catalog = igniteImpl.catalogManager().catalog(igniteImpl.catalogManager().latestCatalogVersion()); + CatalogZoneDescriptor zone = catalog.zone(table.zoneId()); + assertNotNull(zone); + + HybridTimestamp timestamp = igniteImpl.clock().now(); + + assertTrue(waitForCondition(() -> { + int totalPartitionSize = 0; + + for (int p = 0; p < zone.partitions(); p++) { + CompletableFuture<TokenizedAssignments> assignmentsFuture = igniteImpl.placementDriver().getAssignments( + colocationEnabled() + ? new ZonePartitionId(table.zoneId(), p) + : new TablePartitionId(table.tableId(), p), + timestamp); + + assertThat(assignmentsFuture, willCompleteSuccessfully()); + + totalPartitionSize += assignmentsFuture.join().nodes().size(); + } + + return totalPartitionSize == zone.partitions() * zone.replicas(); + }, 10_000)); + } + + /** + * Returns a future that completes when the Default Zone's primary replicas have been elected. + */ + // TODO: remove this method after https://issues.apache.org/jira/browse/IGNITE-25283 has been fixed. + public static void awaitAssignmentsStabilizationOnDefaultZone(Ignite node) throws InterruptedException { + IgniteImpl igniteImpl = unwrapIgniteImpl(node); + + Catalog catalog = igniteImpl.catalogManager().catalog(igniteImpl.catalogManager().latestCatalogVersion()); + + CatalogZoneDescriptor defaultZone = catalog.defaultZone(); + assertNotNull(defaultZone); + + assertTrue(waitForCondition(() -> { + HybridTimestamp timestamp = igniteImpl.clock().now(); + + int totalPartitionSize = 0; + + for (int p = 0; p < defaultZone.partitions(); p++) { + CompletableFuture<TokenizedAssignments> assignmentsFuture = igniteImpl.placementDriver() + .getAssignments(new ZonePartitionId(defaultZone.id(), p), timestamp); + + assertThat(assignmentsFuture, willCompleteSuccessfully()); + + totalPartitionSize += assignmentsFuture.join().nodes().size(); + } + + return totalPartitionSize == defaultZone.partitions() * defaultZone.replicas(); + }, 10_000)); + } +} diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java index e3e900ede74..23ccdd9c191 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java @@ -17,31 +17,15 @@ package org.apache.ignite.internal.sql.engine; -import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; -import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT; -import static org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled; +import static org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilization; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; import org.apache.ignite.internal.sql.engine.util.Commons; -import org.apache.ignite.internal.table.TableImpl; -import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.lang.ErrorGroups.Sql; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -51,9 +35,11 @@ import org.junit.jupiter.api.Test; * Check LIMIT and\or OFFSET commands. */ public class ItLimitOffsetTest extends BaseSqlIntegrationTest { + private static final String TABLE_NAME = "test"; + @BeforeEach void beforeEach() { - sql("CREATE TABLE test (pk INT PRIMARY KEY, col0 INT)"); + sql("CREATE TABLE " + TABLE_NAME + " (pk INT PRIMARY KEY, col0 INT)"); } @AfterEach @@ -67,12 +53,9 @@ public class ItLimitOffsetTest extends BaseSqlIntegrationTest { BigDecimal moreThanUpperLong = new BigDecimal(Long.MAX_VALUE).add(new BigDecimal(1)); // TODO: https://issues.apache.org/jira/browse/IGNITE-25283 Remove - // In case of empty assignments SQL engine will throw "Mandatory nodes was excluded from mapping: []". - // In order to eliminate this assignments stabilization is needed, otherwise test may fail. Not related to collocation. - // awaitAssignmentsStabilization awaits that the default zone/table stable partition assignments size - // will be DEFAULT_PARTITION_COUNT * DEFAULT_REPLICA_COUNT. It's correct only for a single-node cluster that uses default zone, - // that's why given method isn't located in a utility class. - awaitAssignmentsStabilization(CLUSTER.aliveNode()); + // In case of empty assignments, SQL engine will throw "Mandatory nodes was excluded from mapping: []". + // In order to eliminate this, assignments stabilization is needed, otherwise test may fail. Not related to colocation. + awaitAssignmentsStabilization(CLUSTER.aliveNode(), TABLE_NAME); // cache the plan with concrete type param igniteSql().execute(null, "SELECT * FROM test OFFSET ? ROWS", new BigDecimal(Long.MAX_VALUE)); @@ -241,31 +224,4 @@ public class ItLimitOffsetTest extends BaseSqlIntegrationTest { return sb.toString(); } - - private static void awaitAssignmentsStabilization(Ignite node) throws InterruptedException { - IgniteImpl igniteImpl = unwrapIgniteImpl(node); - TableImpl table = unwrapTableImpl(node.tables().table("test")); - int tableOrZoneId = colocationEnabled() ? table.zoneId() : table.tableId(); - - HybridTimestamp timestamp = igniteImpl.clock().now(); - - assertTrue(IgniteTestUtils.waitForCondition(() -> { - int totalPartitionSize = 0; - - // Within given test default zone is used. - for (int p = 0; p < DEFAULT_PARTITION_COUNT; p++) { - CompletableFuture<TokenizedAssignments> assignmentsFuture = igniteImpl.placementDriver().getAssignments( - colocationEnabled() - ? new ZonePartitionId(tableOrZoneId, p) - : new TablePartitionId(tableOrZoneId, p), - timestamp); - - assertThat(assignmentsFuture, willCompleteSuccessfully()); - - totalPartitionSize += assignmentsFuture.join().nodes().size(); - } - - return totalPartitionSize == DEFAULT_PARTITION_COUNT * DEFAULT_REPLICA_COUNT; - }, 10_000)); - } } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java index dc6aa875053..1e52712d099 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java @@ -18,30 +18,20 @@ package org.apache.ignite.internal.tx; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilization; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; -import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone; import static org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.ZonePartitionId; -import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.table.Table; import org.apache.ignite.tx.Transaction; @@ -76,10 +66,8 @@ abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { // This test is rather fragile because it's time dependent. The test uses one second as tx timeout and assumes that it's enough // for an initial operation to find the primary replica, which might not be the case in case of concurrent interleaving rebalance. - // Not related to colocation. awaitAssignmentsStabilization awaits that the default zone/table stable partition assignments size - // will be DEFAULT_PARTITION_COUNT * DEFAULT_REPLICA_COUNT. It's correct only for a single-node cluster that uses default zone, - // that's why given method isn't located in a utility class. - awaitAssignmentsStabilization(cluster.node(0)); + // Not related to colocation. + awaitAssignmentsStabilization(cluster.node(0), TABLE_NAME); return ignite().tables().table(TABLE_NAME); } @@ -158,31 +146,4 @@ abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { private static void doPutOn(Table table, Transaction tx) { table.keyValueView(Integer.class, String.class).put(tx, 1, "one"); } - - private static void awaitAssignmentsStabilization(Ignite node) throws InterruptedException { - IgniteImpl igniteImpl = unwrapIgniteImpl(node); - TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME)); - int tableOrZoneId = colocationEnabled() ? table.zoneId() : table.tableId(); - - HybridTimestamp timestamp = igniteImpl.clock().now(); - - assertTrue(waitForCondition(() -> { - int totalPartitionSize = 0; - - // Within given test, default zone is used. - for (int p = 0; p < DEFAULT_PARTITION_COUNT; p++) { - CompletableFuture<TokenizedAssignments> assignmentsFuture = igniteImpl.placementDriver().getAssignments( - colocationEnabled() - ? new ZonePartitionId(tableOrZoneId, p) - : new TablePartitionId(tableOrZoneId, p), - timestamp); - - assertThat(assignmentsFuture, willCompleteSuccessfully()); - - totalPartitionSize += assignmentsFuture.join().nodes().size(); - } - - return totalPartitionSize == DEFAULT_PARTITION_COUNT * DEFAULT_REPLICA_COUNT; - }, 10_000)); - } } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxInPastTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxInPastTest.java index 393014acefb..425a7f9711f 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxInPastTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxInPastTest.java @@ -17,30 +17,19 @@ package org.apache.ignite.internal.tx.readonly; +import static org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilization; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; -import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone; import static org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.ZonePartitionId; -import org.apache.ignite.internal.table.TableImpl; -import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.tx.TransactionOptions; @@ -83,12 +72,9 @@ class ItReadOnlyTxInPastTest extends ClusterPerTestIntegrationTest { setDefaultZoneAutoAdjustScaleUpTimeoutToImmediate(); } - // In case of empty assignments SQL engine will throw "Mandatory nodes was excluded from mapping: []". - // In order to eliminate this assignments stabilization is needed, otherwise test may fail. Not related to collocation. - // awaitAssignmentsStabilization awaits that the default zone/table stable partition assignments size - // will be DEFAULT_PARTITION_COUNT * DEFAULT_REPLICA_COUNT. It's correct only for a single-node cluster that uses default zone, - // that's why given method isn't located in a utility class. - awaitAssignmentsStabilization(node); + // In case of empty assignments, SQL engine will throw "Mandatory nodes was excluded from mapping: []". + // In order to eliminate this, assignments stabilization is needed, otherwise test may fail. Not related to colocation. + awaitAssignmentsStabilization(node, TABLE_NAME); long count = node.transactions().runInTransaction(tx -> { return cluster.doInSession(0, session -> { @@ -119,31 +105,4 @@ class ItReadOnlyTxInPastTest extends ClusterPerTestIntegrationTest { node(0).sql().executeScript(String.format("ALTER ZONE \"%s\"SET (AUTO SCALE UP 0)", defaultZone.name())); } - - private static void awaitAssignmentsStabilization(Ignite node) throws InterruptedException { - IgniteImpl igniteImpl = unwrapIgniteImpl(node); - TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME)); - int tableOrZoneId = colocationEnabled() ? table.zoneId() : table.tableId(); - - HybridTimestamp timestamp = igniteImpl.clock().now(); - - assertTrue(IgniteTestUtils.waitForCondition(() -> { - int totalPartitionSize = 0; - - // Within given test default zone is used. - for (int p = 0; p < DEFAULT_PARTITION_COUNT; p++) { - CompletableFuture<TokenizedAssignments> assignmentsFuture = igniteImpl.placementDriver().getAssignments( - colocationEnabled() - ? new ZonePartitionId(tableOrZoneId, p) - : new TablePartitionId(tableOrZoneId, p), - timestamp); - - assertThat(assignmentsFuture, willCompleteSuccessfully()); - - totalPartitionSize += assignmentsFuture.join().nodes().size(); - } - - return totalPartitionSize == DEFAULT_PARTITION_COUNT * DEFAULT_REPLICA_COUNT; - }, 10_000)); - } }