This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6e028c3a8de IGNITE-25283 Substitute PD.getAssignments with
PD.awaitNonEmptyAssignments (#6641)
6e028c3a8de is described below
commit 6e028c3a8de875459c1cda3bec04037d30746d22
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Oct 9 08:40:34 2025 +0300
IGNITE-25283 Substitute PD.getAssignments with PD.awaitNonEmptyAssignments
(#6641)
---
.../catalog/ItCatalogApiThreadingTest.java | 10 --
.../ignite/client/handler/FakePlacementDriver.java | 6 +
.../compute/ClassLoaderExceptionsMapper.java | 12 +-
.../ignite/internal/util/ExceptionUtils.java | 15 ++
.../distributionzones/ItEmptyDataNodesTest.java | 51 +++++-
.../ignite/internal/index/TestPlacementDriver.java | 8 +-
.../ignite/jdbc/ItJdbcAuthenticationTest.java | 4 -
.../partition/replicator/fixtures/Node.java | 2 +-
.../replicator/fixtures/TestPlacementDriver.java | 12 ++
.../AssignmentsPlacementDriver.java | 17 ++
.../placementdriver/EmptyAssignmentsException.java | 19 ++-
.../wrappers/DelegatingPlacementDriver.java | 9 +
.../wrappers/ExecutorInclinedPlacementDriver.java | 8 +
.../placementdriver/TestPlacementDriver.java | 8 +-
.../MultiActorPlacementDriverTest.java | 2 +-
.../PlacementDriverManagerTest.java | 2 +-
.../placementdriver/AssignmentsTracker.java | 183 +++++++++++++++++++--
.../placementdriver/PlacementDriverManager.java | 14 +-
.../internal/placementdriver/ActiveActorTest.java | 2 +-
.../placementdriver/AssignmentsTrackerTest.java | 152 +++++++++++++++++
.../placementdriver/LeaseNegotiationTest.java | 4 +-
.../internal/placementdriver/LeaseUpdaterTest.java | 4 +-
.../placementdriver/PlacementDriverTest.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../ignite/internal/AssignmentsTestUtils.java | 30 ----
modules/sql-engine/build.gradle | 2 -
.../internal/sql/engine/ItColocatedDataTest.java | 48 ------
.../internal/sql/engine/ItLimitOffsetTest.java | 6 -
.../mapping/ExecutionDistributionProviderImpl.java | 10 +-
.../internal/table/metrics/ItTableMetricsTest.java | 6 -
.../internal/tx/ItTransactionMetricsTest.java | 5 -
31 files changed, 509 insertions(+), 148 deletions(-)
diff --git
a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
index bb0217af5c3..59b9682e75c 100644
---
a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
+++
b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.catalog;
import static java.lang.Thread.currentThread;
-import static
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone;
import static
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
import static
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
import static
org.apache.ignite.internal.PublicApiThreadingTests.tryToSwitchFromUserThreadWithDelayedSchemaSync;
@@ -26,7 +25,6 @@ import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERS
import static
org.apache.ignite.internal.catalog.ItCatalogDslTest.POJO_RECORD_TABLE_NAME;
import static org.apache.ignite.internal.catalog.ItCatalogDslTest.ZONE_NAME;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -42,7 +40,6 @@ import
org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -53,13 +50,6 @@ class ItCatalogApiThreadingTest extends
ClusterPerClassIntegrationTest {
return 1;
}
- @BeforeAll
- void waitForDefaultZoneStabilization() throws InterruptedException {
- if (colocationEnabled()) {
- awaitAssignmentsStabilizationOnDefaultZone(node(0));
- }
- }
-
@AfterEach
void clearDatabase() {
dropAllTables();
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 06e6c375636..e45f705b5ec 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -130,6 +130,12 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
}
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ return failedFuture(new
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in
FakePlacementDriver yet."));
+ }
+
public List<ReplicaMeta> primaryReplicas() {
return primaryReplicas;
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
index d5110e0216e..c2c6ff195d2 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.compute;
+import static
org.apache.ignite.internal.util.ExceptionUtils.unwrapCompletionThrowable;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.ignite.internal.compute.loader.JobContext;
@@ -38,7 +40,7 @@ class ClassLoaderExceptionsMapper {
) {
return future.handle((v, e) -> {
if (e instanceof Exception) {
- throw new
CompletionException(mapException(unwrapCompletionException((Exception) e),
jobClassName));
+ throw new CompletionException(mapException((Exception)
unwrapCompletionThrowable((Exception) e), jobClassName));
} else {
return v;
}
@@ -70,12 +72,4 @@ class ClassLoaderExceptionsMapper {
return e;
}
}
-
- private static Exception unwrapCompletionException(Exception exception) {
- if (exception instanceof CompletionException) {
- return (Exception) exception.getCause();
- } else {
- return exception;
- }
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 3de2c2cf12d..e7d52116b5c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -496,6 +496,21 @@ public final class ExceptionUtils {
return e;
}
+ /**
+ * Unwraps the cause from {@link CompletionException} if the provided
exception is an instance of it.
+ *
+ * @param t Given throwable.
+ * @return Unwrapped throwable.
+ */
+ @Nullable
+ public static Throwable unwrapCompletionThrowable(@Nullable Throwable t) {
+ if (t instanceof CompletionException) {
+ return t.getCause();
+ } else {
+ return t;
+ }
+ }
+
/**
* Creates a new exception, which type is defined by the provided {@code
supplier}, with the specified {@code t} as a cause.
* In the case when the provided cause {@code t} is an instance of {@link
TraceableException},
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
index 5d05a250f88..bfd049af5c3 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.distributionzones;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.bytesToIntKeepingOrder;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.nio.charset.StandardCharsets;
import java.util.Set;
@@ -36,6 +39,8 @@ import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.distributionzones.exception.EmptyDataNodesException;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.server.WatchListenerInhibitor;
+import org.apache.ignite.internal.placementdriver.EmptyAssignmentsException;
+import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -54,6 +59,7 @@ class ItEmptyDataNodesTest extends
ClusterPerTestIntegrationTest {
setAdditionalNodeFilter(null);
+ // Trigger scale down and data nodes recalculation.
stopNode(2);
int aliveNodesCount = initialNodes() - 1;
@@ -101,6 +107,45 @@ class ItEmptyDataNodesTest extends
ClusterPerTestIntegrationTest {
sql("SELECT * FROM " + TABLE_NAME + " WHERE id = 1");
}
+ @Test
+ public void testInitialEmptyAssignmentsWithSuccessfulWaiting() throws
InterruptedException {
+ createZoneAndTableWithEmptyDataNodes();
+
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ int zoneId =
node.catalogManager().activeCatalog(node.clock().now().longValue()).zone(ZONE_NAME.toUpperCase()).id();
+
+ assertTrue(currentDataNodes(node, zoneId).isEmpty());
+
+ setAdditionalNodeFilter(null);
+
+ // Assignments placement driver should wait for non-empty assignments
for SQL.
+ CompletableFuture<?> sqlFut = sqlAsync("SELECT * FROM " + TABLE_NAME);
+
+ // Trigger scale down and data nodes recalculation.
+ stopNode(2);
+
+ assertThat(sqlFut, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testEmptyAssignmentsException() {
+ createZoneAndTableWithEmptyDataNodes();
+
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ int zoneId =
node.catalogManager().activeCatalog(node.clock().now().longValue()).zone(ZONE_NAME.toUpperCase()).id();
+
+ assertTrue(currentDataNodes(node, zoneId).isEmpty());
+
+ try {
+ sql("SELECT * FROM " + TABLE_NAME);
+ fail();
+ } catch (Exception e) {
+ assertInstanceOf(SqlException.class, e);
+ assertTrue(hasCause(e, EmptyAssignmentsException.class, null));
+ assertTrue(hasCause(e, EmptyDataNodesException.class, null));
+ }
+ }
+
private void createZoneAndTableWithEmptyDataNodes() {
setAdditionalNodeFilter(n -> false);
@@ -109,7 +154,7 @@ class ItEmptyDataNodesTest extends
ClusterPerTestIntegrationTest {
sql(format("CREATE TABLE {} (id INT PRIMARY KEY, val INT) ZONE {}",
TABLE_NAME, ZONE_NAME));
}
- private Set<String> currentDataNodes(IgniteImpl node, int zoneId) {
+ private static Set<String> currentDataNodes(IgniteImpl node, int zoneId) {
CompletableFuture<Set<String>> nodeFut =
node.distributionZoneManager().currentDataNodes(zoneId);
assertThat(nodeFut, willCompleteSuccessfully());
return nodeFut.join();
@@ -119,6 +164,10 @@ class ItEmptyDataNodesTest extends
ClusterPerTestIntegrationTest {
cluster.aliveNode().sql().execute(null, sql);
}
+ private CompletableFuture<?> sqlAsync(String sql) {
+ return cluster.aliveNode().sql().executeAsync(null, sql);
+ }
+
private void setAdditionalNodeFilter(@Nullable
Predicate<NodeWithAttributes> filter) {
cluster.runningNodes()
.map(TestWrappers::unwrapIgniteImpl)
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index d2a08a2c57f..53df45122c8 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -68,7 +68,13 @@ class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
- return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
TestPlacementDriver yet."));
+ }
+
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ return failedFuture(new
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in
TestPlacementDriver yet."));
}
CompletableFuture<Void> setPrimaryReplicaMeta(
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcAuthenticationTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcAuthenticationTest.java
index 6cba4917ef7..61b1a69cd76 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcAuthenticationTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcAuthenticationTest.java
@@ -31,7 +31,6 @@ import java.sql.Statement;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.jdbc.util.JdbcTestUtils;
-import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@@ -117,9 +116,6 @@ class ItJdbcAuthenticationTest {
void jdbcCurrentUser() throws SQLException {
CLUSTER.aliveNode().sql().execute(null, "CREATE TABLE t1 (id INT
PRIMARY KEY, val VARCHAR)").close();
- // TODO https://issues.apache.org/jira/browse/IGNITE-25283 Remove
next line.
- CLUSTER.aliveNode().tables().table("t1").keyValueView().get(null,
Tuple.create().set("id", 1));
-
String connString =
"jdbc:ignite:thin://127.0.0.1:10800?username={}&password={}";
String user1 = "usr";
String user2 = "admin";
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 147070ef5f7..8458a276da2 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -556,7 +556,7 @@ public class Node {
Runnable::run,
metricManager,
zoneId -> completedFuture(Set.of()),
- id -> null
+ zoneId -> null
);
var transactionInflights = new
TransactionInflights(placementDriverManager.placementDriver(), clockService);
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
index f0c78735a44..b8bff4db7a6 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
@@ -127,6 +127,18 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
}
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ List<TokenizedAssignments> assignments = tokenizedAssignments;
+
+ if (assignments == null) {
+ return failedFuture(new AssertionError("Pre-calculated assignments
are not defined in test PlacementDriver yet."));
+ } else {
+ return completedFuture(assignments);
+ }
+ }
+
private CompletableFuture<ReplicaMeta>
getPrimaryReplicaMeta(ReplicationGroupId replicationGroupId) {
if (replicationGroupId instanceof ZonePartitionId &&
((ZonePartitionId) replicationGroupId).zoneId() == DEFAULT_ZONE_ID) {
return nullCompletedFuture();
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
index ae16bfde1b0..c07bb7d2d59 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -60,4 +60,21 @@ public interface AssignmentsPlacementDriver {
List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
);
+
+ /**
+ * Returns the future with list of tokenized assignments for the specified
replication group ids. The future will be completed when
+ * either non-empty assignments for all specified replication group ids
are available or timeoutMillis elapsed since the method was
+ * called. If timeoutMillis elapsed and there are still replication group
ids without assignments, then
+ * {@link EmptyAssignmentsException} is thrown.
+ *
+ * @param replicationGroupIds List of replication group Ids.
+ * @param clusterTimeToAwait Cluster time to await.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @return List of tokenized assignments.
+ */
+ CompletableFuture<List<TokenizedAssignments>> awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait,
+ long timeoutMillis
+ );
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
index 00cd17e43a0..55133dc8e0c 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/EmptyAssignmentsException.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.lang.ErrorGroups.PlacementDriver.EMPTY_ASSIGNMEN
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
/**
* Exception thrown when there are no assignments available.
@@ -29,12 +30,26 @@ import org.apache.ignite.lang.IgniteException;
public class EmptyAssignmentsException extends IgniteException {
private static final long serialVersionUID = 1698246028174494488L;
+ private final ReplicationGroupId groupId;
+
/**
* Constructor.
*
* @param groupId Replication group id.
+ * @param cause Optional cause.
+ */
+ public EmptyAssignmentsException(ReplicationGroupId groupId, @Nullable
Throwable cause) {
+ super(EMPTY_ASSIGNMENTS_ERR, format("Empty assignments for group
[groupId={}].", groupId), cause);
+
+ this.groupId = groupId;
+ }
+
+ /**
+ * Gets replication group id.
+ *
+ * @return Replication group id.
*/
- public EmptyAssignmentsException(ReplicationGroupId groupId) {
- super(EMPTY_ASSIGNMENTS_ERR, format("Empty assignments for group
[groupId={}].", groupId));
+ public ReplicationGroupId groupId() {
+ return groupId;
}
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
index ac5214268a4..c60d9b0e5e7 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
@@ -66,6 +66,15 @@ abstract class DelegatingPlacementDriver implements
PlacementDriver {
return delegate.getCurrentPrimaryReplica(replicationGroupId,
timestamp);
}
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait,
+ long timeoutMillis
+ ) {
+ return delegate.awaitNonEmptyAssignments(replicationGroupIds,
clusterTimeToAwait, timeoutMillis);
+ }
+
@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return delegate.previousPrimaryExpired(grpId);
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
index 94e6f9bc5fc..b514be9b950 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.placementdriver.wrappers;
import static java.util.function.Function.identity;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -66,4 +68,10 @@ public class ExecutorInclinedPlacementDriver extends
DelegatingPlacementDriver {
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return decorateFuture(super.previousPrimaryExpired(grpId));
}
+
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ return
decorateFuture(super.awaitNonEmptyAssignments(replicationGroupIds,
clusterTimeToAwait, timeoutMillis));
+ }
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index d0e92aaea52..3f5b5c7827a 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -103,7 +103,13 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
- return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
TestPlacementDriver yet."));
+ }
+
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ return failedFuture(new
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in
TestPlacementDriver yet."));
}
private CompletableFuture<ReplicaMeta> getReplicaMetaFuture() {
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index a7aa437454e..01be19c4c28 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -330,7 +330,7 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
Runnable::run,
mock(MetricManager.class),
zoneId -> completedFuture(Set.of()),
- id -> null
+ zoneId -> null
);
res.add(new Node(
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index 0f01b1831f9..1550f2ff94e 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -269,7 +269,7 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
Runnable::run,
mock(MetricManager.class),
zoneId -> completedFuture(Set.of()),
- id -> null
+ zoneId -> null
);
ComponentContext componentContext = new ComponentContext();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 8be2854c545..c6eb797349b 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -17,11 +17,19 @@
package org.apache.ignite.internal.placementdriver;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZonePartitionId;
+import static
org.apache.ignite.internal.placementdriver.Utils.extractZoneIdFromGroupId;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static
org.apache.ignite.internal.util.ExceptionUtils.unwrapCompletionThrowable;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.ArrayList;
@@ -31,9 +39,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.components.NodeProperties;
+import
org.apache.ignite.internal.distributionzones.exception.EmptyDataNodesException;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -55,6 +66,7 @@ import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
/**
* The class tracks assignment of all replication groups.
@@ -87,6 +99,14 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
/** Pending assignment Meta storage watch listener. */
private final WatchListener pendingAssignmentsListener;
+ /** Map replication group id to futures that are created when assignments
are empty. */
+ private final Map<ReplicationGroupId,
CompletableFuture<TokenizedAssignments>> nonEmptyAssignmentsFutures = new
ConcurrentHashMap<>();
+
+ private final Function<Integer, CompletableFuture<Set<String>>>
currentDataNodesProvider;
+
+ /** Resolver of zone id by table id (result may be {@code null}). */
+ private final Function<Integer, Integer> zoneIdByTableIdResolver;
+
/**
* The constructor.
*
@@ -97,7 +117,9 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
public AssignmentsTracker(
MetaStorageManager msManager,
FailureProcessor failureProcessor,
- NodeProperties nodeProperties
+ NodeProperties nodeProperties,
+ Function<Integer, CompletableFuture<Set<String>>>
currentDataNodesProvider,
+ Function<Integer, Integer> zoneIdByTableIdResolver
) {
this.msManager = msManager;
this.failureProcessor = failureProcessor;
@@ -108,6 +130,9 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
this.groupPendingAssignments = new ConcurrentHashMap<>();
this.pendingAssignmentsListener = createPendingAssignmentsListener();
+
+ this.currentDataNodesProvider = currentDataNodesProvider;
+ this.zoneIdByTableIdResolver = zoneIdByTableIdResolver;
}
/**
@@ -119,10 +144,10 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
msManager.recoveryFinishedFuture().thenAccept(recoveryRevisions -> {
handleRecoveryAssignments(recoveryRevisions,
pendingAssignmentsQueuePrefixBytes(), groupPendingAssignments,
- bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes()
+ bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes(),
false
);
handleRecoveryAssignments(recoveryRevisions,
stableAssignmentsPrefixBytes(), groupStableAssignments,
- bytes -> Assignments.fromBytes(bytes).nodes()
+ bytes -> Assignments.fromBytes(bytes).nodes(), true
);
}).whenComplete((res, ex) -> {
if (ex != null) {
@@ -161,6 +186,122 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
}));
}
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait,
+ long timeoutMillis
+ ) {
+ return msManager
+ .clusterTime()
+ .waitFor(clusterTimeToAwait)
+ .thenCompose(ignored -> inBusyLock(busyLock, () -> {
+ long now = coarseCurrentTimeMillis();
+ return
awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, now,
timeoutMillis);
+ }))
+ .thenApply(identity());
+ }
+
+ private CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignmentsWithCheckMostRecent(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ long startTime,
+ long timeoutMillis
+ ) {
+ Map<ReplicationGroupId, TokenizedAssignments> assignmentsMap =
stableAssignments();
+
+ Map<Integer, CompletableFuture<TokenizedAssignments>> futures = new
HashMap<>();
+ List<TokenizedAssignments> result = new
ArrayList<>(replicationGroupIds.size());
+
+ for (int i = 0; i < replicationGroupIds.size(); i++) {
+ ReplicationGroupId groupId = replicationGroupIds.get(i);
+
+ TokenizedAssignments a = assignmentsMap.get(groupId);
+
+ if (a == null || a.nodes().isEmpty()) {
+ if (timeoutMillis > 0) {
+ futures.put(i, nonEmptyAssignmentFuture(groupId,
timeoutMillis));
+ } else {
+ // If timeout is zero or less, then this group is failed,
the correct exception will be thrown
+ // in #checkEmptyAssignmentsReason().
+ futures.put(i, failedFuture(new TimeoutException()));
+ }
+ } else {
+ result.add(a);
+ }
+ }
+
+ if (futures.isEmpty()) {
+ return completedFuture(result);
+ } else {
+ return allOf(futures.values())
+ .handle((unused, e) -> {
+ CompletableFuture<List<TokenizedAssignments>> r;
+ Throwable cause = unwrapCompletionThrowable(e);
+
+ if (cause == null) {
+ // Get the most recent assignments after the
waiting.
+ long now = System.currentTimeMillis();
+ long newTimeoutMillis = timeoutMillis - (now -
startTime);
+ r =
awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, startTime,
newTimeoutMillis);
+ } else if (cause instanceof EmptyAssignmentsException)
{
+ r =
checkEmptyAssignmentsReason((EmptyAssignmentsException) cause);
+ } else {
+ r = failedFuture(cause);
+ }
+
+ return r;
+ })
+ .thenCompose(identity());
+ }
+ }
+
+ private CompletableFuture<List<TokenizedAssignments>>
checkEmptyAssignmentsReason(EmptyAssignmentsException ex) {
+ Integer zoneId = extractZoneIdFromGroupId(
+ ex.groupId(),
+ nodeProperties.colocationEnabled(),
+ zoneIdByTableIdResolver
+ );
+
+ if (zoneId == null) {
+ return failedFuture(ex);
+ } else {
+ return currentDataNodesProvider.apply(zoneId)
+ .thenApply(dataNodes -> {
+ if (dataNodes.isEmpty()) {
+ throw new EmptyAssignmentsException(ex.groupId(),
new EmptyDataNodesException(zoneId));
+ } else {
+ sneakyThrow(ex);
+ return null;
+ }
+ });
+ }
+ }
+
+ private CompletableFuture<TokenizedAssignments>
nonEmptyAssignmentFuture(ReplicationGroupId groupId, long futureTimeoutMillis) {
+ CompletableFuture<TokenizedAssignments> result =
nonEmptyAssignmentsFutures.computeIfAbsent(groupId, k ->
+ new CompletableFuture<TokenizedAssignments>()
+ .orTimeout(futureTimeoutMillis, TimeUnit.MILLISECONDS)
+ .handle((v, e) -> {
+ if (e instanceof TimeoutException) {
+ throw new EmptyAssignmentsException(groupId,
e);
+ } else if (e != null) {
+ sneakyThrow(e);
+ return null;
+ } else {
+ return v;
+ }
+ })
+ );
+
+ TokenizedAssignments assignments = groupStableAssignments.get(groupId);
+ if (assignments != null && !assignments.nodes().isEmpty()) {
+ nonEmptyAssignmentsFutures.remove(groupId, result);
+ result.complete(assignments);
+ }
+
+ return result;
+ }
+
/**
* Gets stable assignments.
*
@@ -186,7 +327,7 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
}
handleReceivedAssignments(event, stableAssignmentsPrefixBytes(),
groupStableAssignments,
- bytes -> Assignments.fromBytes(bytes).nodes()
+ bytes -> Assignments.fromBytes(bytes).nodes(), true
);
return nullCompletedFuture();
@@ -200,7 +341,7 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
}
handleReceivedAssignments(event,
pendingAssignmentsQueuePrefixBytes(), groupPendingAssignments,
- bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes()
+ bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes(),
false
);
return nullCompletedFuture();
@@ -211,7 +352,8 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
WatchEvent event,
byte[] assignmentsMetastoreKeyPrefix,
Map<ReplicationGroupId, TokenizedAssignments>
groupIdToAssignmentsMap,
- Function<byte[], Set<Assignment>> deserializer
+ Function<byte[], Set<Assignment>> deserializer,
+ boolean isStable
) {
for (EntryEvent evt : event.entryEvents()) {
Entry entry = evt.newEntry();
@@ -220,8 +362,9 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
if (entry.tombstone()) {
groupIdToAssignmentsMap.remove(grpId);
+ completeNonEmptyAssignmentsFutureIfExists(grpId, null);
} else {
- updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry,
deserializer);
+ updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry,
deserializer, isStable);
}
}
}
@@ -230,7 +373,8 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
Revisions recoveryRevisions,
byte[] assignmentsMetastoreKeyPrefix,
Map<ReplicationGroupId, TokenizedAssignments>
groupIdToAssignmentsMap,
- Function<byte[], Set<Assignment>> deserializer
+ Function<byte[], Set<Assignment>> deserializer,
+ boolean isStable
) {
var prefix = new ByteArray(assignmentsMetastoreKeyPrefix);
@@ -244,16 +388,17 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
ReplicationGroupId grpId =
extractReplicationGroupPartitionId(entry.key(), assignmentsMetastoreKeyPrefix);
- updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry,
deserializer);
+ updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry,
deserializer, isStable);
}
}
}
- private static void updateGroupAssignments(
+ private void updateGroupAssignments(
Map<ReplicationGroupId, TokenizedAssignments>
groupIdToAssignmentsMap,
ReplicationGroupId grpId,
Entry entry,
- Function<byte[], Set<Assignment>> deserializer
+ Function<byte[], Set<Assignment>> deserializer,
+ boolean isStable
) {
byte[] value = entry.value();
@@ -262,7 +407,21 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
Set<Assignment> assignmentNodes = deserializer.apply(value);
- groupIdToAssignmentsMap.put(grpId, new
TokenizedAssignmentsImpl(assignmentNodes, entry.revision()));
+ var assignments = new TokenizedAssignmentsImpl(assignmentNodes,
entry.revision());
+
+ groupIdToAssignmentsMap.put(grpId, assignments);
+
+ if (isStable && !assignments.nodes().isEmpty()) {
+ completeNonEmptyAssignmentsFutureIfExists(grpId, assignments);
+ }
+ }
+
+ private void completeNonEmptyAssignmentsFutureIfExists(ReplicationGroupId
grpId, @Nullable TokenizedAssignments assignments) {
+ CompletableFuture<TokenizedAssignments> fut =
nonEmptyAssignmentsFutures.remove(grpId);
+
+ if (fut != null) {
+ fut.complete(assignments);
+ }
}
private static String collectKeysFromEventAsString(WatchEvent event) {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index e398817c642..d80b1f52286 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -166,7 +166,13 @@ public class PlacementDriverManager implements
IgniteComponent {
nodeProperties
);
- this.assignmentsTracker = new AssignmentsTracker(metastore,
failureProcessor, nodeProperties);
+ this.assignmentsTracker = new AssignmentsTracker(
+ metastore,
+ failureProcessor,
+ nodeProperties,
+ currentDataNodesProvider,
+ zoneIdByTableIdResolver
+ );
this.leaseUpdater = new LeaseUpdater(
nodeName,
@@ -331,6 +337,12 @@ public class PlacementDriverManager implements
IgniteComponent {
return assignmentsTracker.getAssignments(replicationGroupIds,
timestamp);
}
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ return
assignmentsTracker.awaitNonEmptyAssignments(replicationGroupIds,
clusterTimeToAwait, timeoutMillis);
+ }
+
@Override
public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
ReplicationGroupId groupId,
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index ef151987c1a..0f43704ef44 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -144,7 +144,7 @@ public class ActiveActorTest extends
AbstractTopologyAwareGroupServiceTest {
Runnable::run,
mock(MetricManager.class),
zoneId -> completedFuture(Set.of()),
- id -> null
+ zoneId -> null
);
assertThat(placementDriverManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
new file mode 100644
index 00000000000..ff3588d5f04
--- /dev/null
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
+import static
org.apache.ignite.internal.partitiondistribution.Assignment.forPeer;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link AssignmentsTracker}.
+ */
+public class AssignmentsTrackerTest extends BaseIgniteAbstractTest {
+ private final ReplicationGroupId groupId0 = colocationEnabled() ? new
ZonePartitionId(0, 0) : new TablePartitionId(0, 0);
+ private final ReplicationGroupId groupId1 = colocationEnabled() ? new
ZonePartitionId(1, 0) : new TablePartitionId(1, 0);
+
+ private Set<String> dataNodes0 = emptySet();
+ private Set<String> dataNodes1 = emptySet();
+
+ private MetaStorageManager metaStorageManager;
+
+ private AssignmentsTracker assignmentsTracker;
+
+ @BeforeEach
+ public void setUp() {
+ metaStorageManager = StandaloneMetaStorageManager.create();
+
+ assertThat(metaStorageManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+
+ assignmentsTracker = new AssignmentsTracker(
+ metaStorageManager,
+ new TestFailureProcessor(),
+ new SystemPropertiesNodeProperties(),
+ zoneId -> completedFuture(dataNodes(zoneId)),
+ id -> id
+ );
+
+ assignmentsTracker.startTrack();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ assignmentsTracker.stopTrack();
+ assertThat(metaStorageManager.stopAsync(), willCompleteSuccessfully());
+ }
+
+ private Set<String> dataNodes(int zoneId) {
+ return zoneId == 0 ? dataNodes0 : dataNodes1;
+ }
+
+ private ByteArray assignmentsKey(ReplicationGroupId groupId) {
+ return colocationEnabled()
+ ? ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)
groupId)
+ : RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)
groupId);
+ }
+
+ @Test
+ public void testInitialEmptyAssignmentsWithSuccessfulWaiting() {
+ CompletableFuture<List<TokenizedAssignments>> assignmentsListFuture =
assignmentsTracker
+ .awaitNonEmptyAssignments(List.of(groupId0),
metaStorageManager.clusterTime().currentSafeTime(), 10_000);
+
+ assertFalse(assignmentsListFuture.isDone());
+
+ metaStorageManager.put(assignmentsKey(groupId0),
Assignments.toBytes(Set.of(forPeer("A")),
HybridTimestamp.MIN_VALUE.longValue()));
+
+ assertThat(assignmentsListFuture, willCompleteSuccessfully());
+
+ assertEquals(1, assignmentsListFuture.join().get(0).nodes().size());
+ }
+
+ @Test
+ public void testChangeAssignmentsForOneGroupWhileWaitingForAnother() {
+ CompletableFuture<List<TokenizedAssignments>> assignmentsListFuture =
assignmentsTracker
+ .awaitNonEmptyAssignments(List.of(groupId0, groupId1),
metaStorageManager.clusterTime().currentSafeTime(), 10_000);
+
+ assertFalse(assignmentsListFuture.isDone());
+
+ Set<Assignment> assignmentsForGroup0Initial = Set.of(forPeer("A"),
forPeer("B"));
+ Set<Assignment> assignmentsForGroup0Updated = Set.of(forPeer("B"),
forPeer("C"));
+ Set<Assignment> assignmentsForGroup1 = Set.of(forPeer("D"),
forPeer("E"));
+
+ dataNodes0 =
assignmentsForGroup0Initial.stream().map(Assignment::consistentId).collect(toSet());
+
+ metaStorageManager.put(
+ assignmentsKey(groupId0),
+ Assignments.toBytes(assignmentsForGroup0Updated,
HybridTimestamp.MIN_VALUE.longValue())
+ );
+
+ metaStorageManager.put(
+ assignmentsKey(groupId1),
+ Assignments.toBytes(assignmentsForGroup1,
HybridTimestamp.MIN_VALUE.longValue())
+ );
+
+ assertThat(assignmentsListFuture, willCompleteSuccessfully());
+
+ assertEquals(assignmentsForGroup0Updated,
assignmentsListFuture.join().get(0).nodes());
+ assertEquals(assignmentsForGroup1,
assignmentsListFuture.join().get(1).nodes());
+ }
+
+ private static class TestFailureProcessor implements FailureProcessor {
+ @Override
+ public boolean process(FailureContext failureCtx) {
+ return false;
+ }
+ }
+}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
index 9594cf62364..eb48600c091 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -210,7 +210,9 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
assignmentsTracker = new AssignmentsTracker(
metaStorageManager,
mock(FailureProcessor.class),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ zoneId -> completedFuture(Set.of()),
+ zoneId -> null
);
assignmentsTracker.startTrack();
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index d71484b6d05..22278e548a7 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -185,7 +185,9 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
assignmentsTracker = new AssignmentsTracker(
metaStorageManager,
mock(FailureProcessor.class),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ zoneId -> completedFuture(Set.of()),
+ zoneId -> null
);
assignmentsTracker.startTrack();
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index a06de408aaf..08b77bb1e84 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -956,7 +956,9 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
return new AssignmentsTracker(
metastore,
mock(FailureProcessor.class),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ zoneId -> completedFuture(Set.of("A")),
+ zoneId -> null
);
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 07995a28d42..737109f25e5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -594,7 +594,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
threadPoolsManager.commonScheduler(),
metricManager,
zoneId -> completedFuture(Set.of()),
- id -> null
+ zoneId -> null
);
ReplicaManager replicaMgr = new ReplicaManager(
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java
index b32b92a0a8a..df4bfa4e085 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/AssignmentsTestUtils.java
@@ -75,34 +75,4 @@ public class AssignmentsTestUtils {
return totalPartitionSize == zone.partitions() * zone.replicas();
}, 10_000));
}
-
- /**
- * Returns a future that completes when the Default Zone's primary
replicas have been elected.
- */
- // TODO: remove this method after
https://issues.apache.org/jira/browse/IGNITE-25283 has been fixed.
- public static void awaitAssignmentsStabilizationOnDefaultZone(Ignite node)
throws InterruptedException {
- IgniteImpl igniteImpl = unwrapIgniteImpl(node);
-
- Catalog catalog =
igniteImpl.catalogManager().catalog(igniteImpl.catalogManager().latestCatalogVersion());
-
- CatalogZoneDescriptor defaultZone = catalog.defaultZone();
- assertNotNull(defaultZone);
-
- assertTrue(waitForCondition(() -> {
- HybridTimestamp timestamp = igniteImpl.clock().now();
-
- int totalPartitionSize = 0;
-
- for (int p = 0; p < defaultZone.partitions(); p++) {
- CompletableFuture<TokenizedAssignments> assignmentsFuture =
igniteImpl.placementDriver()
- .getAssignments(new ZonePartitionId(defaultZone.id(),
p), timestamp);
-
- assertThat(assignmentsFuture, willCompleteSuccessfully());
-
- totalPartitionSize += assignmentsFuture.join().nodes().size();
- }
-
- return totalPartitionSize == defaultZone.partitions() *
defaultZone.replicas();
- }, 10_000));
- }
}
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 0c335b63cca..3aa7c06e7fd 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -139,8 +139,6 @@ dependencies {
integrationTestImplementation project(':ignite-network-api')
integrationTestImplementation project(':ignite-sql-engine-api')
integrationTestImplementation project(':ignite-eventlog')
- integrationTestImplementation project(':ignite-placement-driver-api') //
TODO: IGNITE-25283 - remove.
- integrationTestImplementation project(':ignite-partition-distribution') //
TODO: IGNITE-25283 - remove.
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-schema'))
integrationTestImplementation testFixtures(project(':ignite-sql-engine'))
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItColocatedDataTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItColocatedDataTest.java
index 47e82d452c8..94d0913c7f2 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItColocatedDataTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItColocatedDataTest.java
@@ -17,25 +17,9 @@
package org.apache.ignite.internal.sql.engine;
-import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.catalog.Catalog;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -49,8 +33,6 @@ import org.junit.jupiter.params.provider.EnumSource;
public class ItColocatedDataTest extends BaseSqlIntegrationTest {
@BeforeAll
public static void beforeTestsStarted() throws InterruptedException {
- waitForDefaultZoneAssignments();
-
//noinspection ConcatenationWithEmptyString
sqlScript(""
+ "CREATE TABLE T1 (id INT PRIMARY KEY, c1 INT);"
@@ -141,34 +123,4 @@ public class ItColocatedDataTest extends
BaseSqlIntegrationTest {
this.disabledRules = disabledRules;
}
}
-
- /**
- * Waits for initial default zone assignments to appear.
- */
- // TODO: remove this method after
https://issues.apache.org/jira/browse/IGNITE-25283 has been fixed.
- private static void waitForDefaultZoneAssignments() throws
InterruptedException {
- if (!colocationEnabled()) {
- return;
- }
-
- IgniteImpl nodeImpl = unwrapIgniteImpl(CLUSTER.aliveNode());
-
- Catalog catalog =
nodeImpl.catalogManager().catalog(nodeImpl.catalogManager().latestCatalogVersion());
-
- CatalogZoneDescriptor defaultZone = catalog.defaultZone();
-
- List<ZonePartitionId> partitionIds = IntStream.range(0,
defaultZone.partitions())
- .mapToObj(partId -> new ZonePartitionId(defaultZone.id(),
partId))
- .collect(toList());
-
- assertTrue(waitForCondition(() -> {
- HybridTimestamp now = nodeImpl.clock().now();
-
- CompletableFuture<List<TokenizedAssignments>> assignmentsFuture =
nodeImpl.placementDriver().getAssignments(partitionIds, now);
-
- assertThat(assignmentsFuture, willCompleteSuccessfully());
-
- return assignmentsFuture.join().stream().noneMatch(assignments ->
assignments.nodes().isEmpty());
- }, 15_000));
- }
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java
index 23ccdd9c191..b0a21bb5f14 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItLimitOffsetTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine;
-import static
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilization;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -52,11 +51,6 @@ public class ItLimitOffsetTest extends
BaseSqlIntegrationTest {
public void testInvalidLimitOffset() throws InterruptedException {
BigDecimal moreThanUpperLong = new BigDecimal(Long.MAX_VALUE).add(new
BigDecimal(1));
- // TODO: https://issues.apache.org/jira/browse/IGNITE-25283 Remove
- // In case of empty assignments, SQL engine will throw "Mandatory
nodes was excluded from mapping: []".
- // In order to eliminate this, assignments stabilization is needed,
otherwise test may fail. Not related to colocation.
- awaitAssignmentsStabilization(CLUSTER.aliveNode(), TABLE_NAME);
-
// cache the plan with concrete type param
igniteSql().execute(null, "SELECT * FROM test OFFSET ? ROWS", new
BigDecimal(Long.MAX_VALUE));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
index 7a4073a81d5..219279b0ba4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
@@ -46,6 +46,11 @@ import
org.apache.ignite.internal.systemview.api.SystemViewManager;
/** Execution nodes information provider. */
public class ExecutionDistributionProviderImpl implements
ExecutionDistributionProvider {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26651
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26652
+ /** Non-empty assignments await timeout. */
+ public static final int AWAIT_NON_EMPTY_ASSIGNMENTS_TIMEOUT_MILLIS =
30_000;
+
private static final IgniteLogger LOG =
Loggers.forClass(ExecutionDistributionProviderImpl.class);
private final PlacementDriver placementDriver;
private final SystemViewManager systemViewManager;
@@ -156,9 +161,10 @@ public class ExecutionDistributionProviderImpl implements
ExecutionDistributionP
List<ReplicationGroupId> replicationGroupIds,
HybridTimestamp operationTime
) {
- return placementDriver.getAssignments(
+ return placementDriver.awaitNonEmptyAssignments(
replicationGroupIds,
- operationTime
+ operationTime,
+ AWAIT_NON_EMPTY_ASSIGNMENTS_TIMEOUT_MILLIS
);
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
index 2982523a157..7d18249ec38 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.table.metrics;
import static java.util.List.of;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static
org.apache.ignite.internal.table.metrics.TableMetricSource.RO_READS;
import static
org.apache.ignite.internal.table.metrics.TableMetricSource.RW_READS;
import static
org.apache.ignite.internal.table.metrics.TableMetricSource.WRITES;
@@ -69,10 +67,6 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
sql("CREATE INDEX IF NOT EXISTS " + SORTED_IDX + " ON PUBLIC." +
TABLE_NAME + " USING SORTED (id)");
sql("CREATE INDEX IF NOT EXISTS " + HASH_IDX + " ON PUBLIC." +
TABLE_NAME + " USING HASH (val)");
-
- if (colocationEnabled()) {
- awaitAssignmentsStabilizationOnDefaultZone(CLUSTER.aliveNode());
- }
}
/**
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
index a5923f777e1..20775b8705a 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.tx;
import static java.util.stream.Collectors.toSet;
-import static
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
@@ -69,10 +68,6 @@ public class ItTransactionMetricsTest extends
ClusterPerClassIntegrationTest {
@BeforeAll
void createTable() throws Exception {
sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
-
- if (colocationEnabled()) {
- awaitAssignmentsStabilizationOnDefaultZone(CLUSTER.aliveNode());
- }
}
/**