This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f98f98e91b IGNITE-22467 Add getAssignments method to the
PlacementDriver (#3947)
f98f98e91b is described below
commit f98f98e91be5949dd018d5442af1f503e3596211
Author: Alexander Lapin <[email protected]>
AuthorDate: Fri Jun 21 19:19:36 2024 +0300
IGNITE-22467 Add getAssignments method to the PlacementDriver (#3947)
---
.../internal/affinity/TokenizedAssignments.java} | 25 +-
.../affinity/TokenizedAssignmentsImpl.java | 62 +++++
modules/client-handler/build.gradle | 1 +
.../ignite/client/handler/FakePlacementDriver.java | 12 +-
modules/index/build.gradle | 1 +
.../ignite/internal/index/TestPlacementDriver.java | 11 +
.../replicator/utils/TestPlacementDriver.java | 10 +
modules/placement-driver-api/build.gradle | 2 +
.../AssignmentsPlacementDriver.java | 43 ++++
...cementDriver.java => LeasePlacementDriver.java} | 16 +-
.../internal/placementdriver/PlacementDriver.java | 52 +----
.../placementdriver/TestPlacementDriver.java | 9 +
.../placementdriver/AssignmentsTracker.java | 34 ++-
.../internal/placementdriver/LeaseUpdater.java | 13 +-
.../placementdriver/PlacementDriverManager.java | 65 +++++-
.../placementdriver/leases/LeaseTracker.java | 5 +-
.../placementdriver/LeaseNegotiationTest.java | 3 +-
.../internal/placementdriver/LeaseUpdaterTest.java | 3 +-
.../placementdriver/PlacementDriverTest.java | 255 +++++++++++++++++++--
.../wrappers/DelegatingPlacementDriver.java | 9 +
20 files changed, 519 insertions(+), 112 deletions(-)
diff --git a/modules/placement-driver-api/build.gradle
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignments.java
similarity index 54%
copy from modules/placement-driver-api/build.gradle
copy to
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignments.java
index 4a6385f667..887f841d1f 100644
--- a/modules/placement-driver-api/build.gradle
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignments.java
@@ -15,21 +15,18 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
-apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+package org.apache.ignite.internal.affinity;
-dependencies {
- annotationProcessor project(":ignite-network-annotation-processor")
+import java.io.Serializable;
+import java.util.Set;
- implementation project(':ignite-core')
- implementation project(':ignite-network-api')
- implementation libs.jetbrains.annotations
+/**
+ * Set of nodes along with associated token that is guaranteed to be changed
if the set was changed.
+ */
+public interface TokenizedAssignments extends Serializable {
+ /** Returns a set of nodes, represented by this assignments instance. */
+ Set<Assignment> nodes();
- testFixturesImplementation project(':ignite-core')
- testFixturesImplementation project(':ignite-network-api')
- testFixturesImplementation libs.jetbrains.annotations
+ /** Returns a token associated with given assignments that is guaranteed
to be changed if assignments were changed. */
+ long token();
}
-
-description = 'ignite-placement-driver-api'
diff --git
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignmentsImpl.java
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignmentsImpl.java
new file mode 100644
index 0000000000..d3d7db5d82
--- /dev/null
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignmentsImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.Set;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Set of nodes along with associated token that is guaranteed to be changed
if the set was changed.
+ */
+public class TokenizedAssignmentsImpl implements TokenizedAssignments {
+ private static final long serialVersionUID = -6960630542063056327L;
+
+ @IgniteToStringInclude
+ private final Set<Assignment> nodes;
+
+ private final long token;
+
+ /**
+ * The constructor.
+ *
+ * @param nodes Set of nodes.
+ * @param token Token.
+ */
+ public TokenizedAssignmentsImpl(Set<Assignment> nodes, long token) {
+ this.nodes = nodes;
+ this.token = token;
+ }
+
+ @Override
+ public Set<Assignment> nodes() {
+ return unmodifiableSet(nodes);
+ }
+
+ @Override
+ public long token() {
+ return token;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git a/modules/client-handler/build.gradle
b/modules/client-handler/build.gradle
index ad81fcb764..ec68884b44 100644
--- a/modules/client-handler/build.gradle
+++ b/modules/client-handler/build.gradle
@@ -90,6 +90,7 @@ dependencies {
testFixturesImplementation project(':ignite-placement-driver-api')
testFixturesImplementation project(':ignite-catalog')
testFixturesImplementation project(':ignite-security-api')
+ testFixturesImplementation project(':ignite-affinity')
testFixturesImplementation libs.mockito.junit
}
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 5853de37f1..62c0fc17e8 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client.handler;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -89,7 +91,7 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
TablePartitionId id = (TablePartitionId) groupId;
return returnError
- ? CompletableFuture.failedFuture(new
RuntimeException("FakePlacementDriver expected error"))
+ ? failedFuture(new RuntimeException("FakePlacementDriver
expected error"))
:
CompletableFuture.completedFuture(primaryReplicas.get(id.partitionId()));
}
@@ -103,6 +105,14 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return nullCompletedFuture();
}
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ }
+
private static ReplicaMeta getReplicaMeta(String leaseholder, long
leaseStartTime) {
//noinspection serial
return new ReplicaMeta() {
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 79e031cfa3..a2b0a66c47 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -40,6 +40,7 @@ dependencies {
implementation project(':ignite-cluster-management')
implementation project(':ignite-low-watermark')
implementation project(':ignite-partition-replicator')
+ implementation project(':ignite-affinity')
implementation libs.jetbrains.annotations
testImplementation(testFixtures(project(':ignite-configuration')))
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index fe207e7588..fb81f9cee2 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.index;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -54,6 +57,14 @@ class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ }
+
CompletableFuture<Void> setPrimaryReplicaMeta(
long causalityToken,
TablePartitionId replicaId,
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index e5c6615bf4..51b207f66b 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.partition.replicator.utils;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -64,6 +66,14 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return nullCompletedFuture();
}
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ }
+
private CompletableFuture<ReplicaMeta> getPrimaryReplicaMeta() {
if (primary == null) {
throw new IllegalStateException("Primary replica is not defined in
test PlacementDriver");
diff --git a/modules/placement-driver-api/build.gradle
b/modules/placement-driver-api/build.gradle
index 4a6385f667..a446f980f1 100644
--- a/modules/placement-driver-api/build.gradle
+++ b/modules/placement-driver-api/build.gradle
@@ -25,10 +25,12 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-network-api')
+ implementation project(':ignite-affinity')
implementation libs.jetbrains.annotations
testFixturesImplementation project(':ignite-core')
testFixturesImplementation project(':ignite-network-api')
+ testFixturesImplementation project(':ignite-affinity')
testFixturesImplementation libs.jetbrains.annotations
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
new file mode 100644
index 0000000000..641ab117ec
--- /dev/null
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Service that provides an ability to await and retrieve assignments for
replication groups.
+ * It's not guaranteed that retrieved assignments are ready to process
requests, meaning that corresponding replicas
+ * may not be started yet, or may be stopped at the next tick. On the stable
replication group, assignments however
+ * may be interpreted as replica hosts to process the requests.
+ */
+public interface AssignmentsPlacementDriver {
+
+ /**
+ * Returns the future with either newest available tokenized assignments
for the specified replication group id or {@code null} if
+ * there are no such assignments. The future will be completed after
clusterTime (meta storage safe time) will become greater or equal
+ * to the clusterTimeToAwait parameter.
+ *
+ * @param replicationGroupId Replication group Id.
+ * @param clusterTimeToAwait Cluster time to await.
+ * @return Tokenized assignments.
+ */
+ CompletableFuture<TokenizedAssignments> getAssignments(ReplicationGroupId
replicationGroupId, HybridTimestamp clusterTimeToAwait);
+}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
similarity index 85%
copy from
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
copy to
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
index f06e28a8b3..99f6d41822 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
@@ -35,14 +35,14 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
* replication group.</p>
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-20646 Consider using
CLOCK_SKEW unaware await/getPrimaryReplica()
-public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent,
PrimaryReplicaEventParameters> {
+public interface LeasePlacementDriver extends
EventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters> {
/**
* Returns a future for the primary replica for the specified replication
group whose expiration time (the right border of the
- * corresponding lease interval) is greater than or equal to the
(timestamp passed as a parameter - CLOCK_SKEW).
- * Please pay attention that there are no restriction on the lease start
time (left border),
- * it can either be less or greater than or equal to proposed timestamp.
- * Given method will await for an appropriate primary replica appearance
if there's no already existing one. If the current lease
- * is held by a node that is already not in a cluster, the future will be
completed after the lease is transferred to another node.
+ * corresponding lease interval) is greater than or equal to the
(timestamp passed as a parameter - CLOCK_SKEW). Please pay attention
+ * that there are no restriction on the lease start time (left border), it
can either be less or greater than or equal to proposed
+ * timestamp. Given method will await for an appropriate primary replica
appearance if there's no already existing one. If the current
+ * lease is held by a node that is already not in a cluster, the future
will be completed after the lease is transferred to another
+ * node.
*
* @param groupId Replication group id.
* @param timestamp CLOCK_SKEW aware timestamp reference value.
@@ -74,8 +74,8 @@ public interface PlacementDriver extends
EventProducer<PrimaryReplicaEvent, Prim
* Returns a future that completes when all expiration event {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
* primary complete.
*
- * @param grpId Replication group id.
+ * @param replicationGroupId Replication group id.
* @return Future.
*/
- CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId);
+ CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
replicationGroupId);
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index f06e28a8b3..b2d6425e17 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -17,16 +17,11 @@
package org.apache.ignite.internal.placementdriver;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.event.EventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
-import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
/**
- * Service that provides an ability to await and retrieve primary replicas for
replication groups.
+ * Service that provides an ability to await and retrieve primary replicas and
assignments for replication groups.
*
* <p>Notes: If during recovery, the component needs to perform actions
depending on whether the primary replica for some replication group
* is a local node, then it needs to use {@link
#getPrimaryReplica(ReplicationGroupId, HybridTimestamp)}. Then compare the
local node with
@@ -34,48 +29,5 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
* {@link ReplicaMeta#getExpirationTime()}. And only then can we consider that
the local node is the primary replica for the requested
* replication group.</p>
*/
-// TODO: https://issues.apache.org/jira/browse/IGNITE-20646 Consider using
CLOCK_SKEW unaware await/getPrimaryReplica()
-public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent,
PrimaryReplicaEventParameters> {
- /**
- * Returns a future for the primary replica for the specified replication
group whose expiration time (the right border of the
- * corresponding lease interval) is greater than or equal to the
(timestamp passed as a parameter - CLOCK_SKEW).
- * Please pay attention that there are no restriction on the lease start
time (left border),
- * it can either be less or greater than or equal to proposed timestamp.
- * Given method will await for an appropriate primary replica appearance
if there's no already existing one. If the current lease
- * is held by a node that is already not in a cluster, the future will be
completed after the lease is transferred to another node.
- *
- * @param groupId Replication group id.
- * @param timestamp CLOCK_SKEW aware timestamp reference value.
- * @param timeout How long to wait before completing exceptionally with a
TimeoutException, in units of unit.
- * @param unit A TimeUnit determining how to interpret the timeout
parameter.
- * @return Primary replica future.
- * @throws PrimaryReplicaAwaitTimeoutException If primary replica await
timed out.
- * @throws PrimaryReplicaAwaitException If primary replica await failed
with any other reason except timeout.
- */
- CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
- ReplicationGroupId groupId,
- HybridTimestamp timestamp,
- long timeout,
- TimeUnit unit
- );
-
- /**
- * Same as {@link #awaitPrimaryReplica(ReplicationGroupId,
HybridTimestamp, long, TimeUnit)} despite the fact that given method await
- * logic is bounded. It will wait for a primary replica for a reasonable
period of time, and complete a future with null if a matching
- * lease isn't found. Generally speaking reasonable here means enough for
distribution across cluster nodes.
- *
- * @param replicationGroupId Replication group id.
- * @param timestamp CLOCK_SKEW aware timestamp reference value.
- * @return Primary replica future.
- */
- CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp);
-
- /**
- * Returns a future that completes when all expiration event {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
- * primary complete.
- *
- * @param grpId Replication group id.
- * @return Future.
- */
- CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId);
+public interface PlacementDriver extends LeasePlacementDriver,
AssignmentsPlacementDriver {
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 440cbbb309..3392cef934 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -83,6 +84,14 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return super.fireEvent(event, parameters);
}
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ }
+
private CompletableFuture<ReplicaMeta> getReplicaMetaFuture() {
try {
return completedFuture(primaryReplicaSupplier.get());
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 11e4da4de6..b71a9f7f4c 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.StringUtils.incrementLastChar;
import java.nio.charset.StandardCharsets;
@@ -29,6 +30,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -40,23 +44,30 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
* The class tracks assignment of all replication groups.
*/
-public class AssignmentsTracker {
+public class AssignmentsTracker implements AssignmentsPlacementDriver {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(AssignmentsTracker.class);
+ // TODO Not sure whether it should be instantiated here or propagated from
PDM.
+ // TODO Use it on stop, etc.
+ /** Busy lock to linearize service public API calls and service stop. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
/** Meta storage manager. */
private final MetaStorageManager msManager;
/** Map replication group id to assignment nodes. */
- private final Map<ReplicationGroupId, Set<Assignment>> groupAssignments;
+ private final Map<ReplicationGroupId, TokenizedAssignments>
groupAssignments;
/** Assignment Meta storage watch listener. */
private final AssignmentsListener assignmentsListener;
+
/**
* The constructor.
*
@@ -96,9 +107,9 @@ public class AssignmentsTracker {
TablePartitionId grpId =
TablePartitionId.fromString(strKey);
- Set<Assignment> assignments =
Assignments.fromBytes(entry.value()).nodes();
+ Set<Assignment> assignmentNodes =
Assignments.fromBytes(entry.value()).nodes();
- groupAssignments.put(grpId, assignments);
+ groupAssignments.put(grpId, new
TokenizedAssignmentsImpl(assignmentNodes, entry.revision()));
}
}
}).whenComplete((res, ex) -> {
@@ -117,12 +128,23 @@ public class AssignmentsTracker {
msManager.unregisterWatch(assignmentsListener);
}
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return msManager
+ .clusterTime()
+ .waitFor(clusterTimeToAwait)
+ .thenApply(ignored -> inBusyLock(busyLock, () ->
assignments().get(replicationGroupId)));
+ }
+
/**
* Gets assignments.
*
* @return Map replication group id to its assignment.
*/
- public Map<ReplicationGroupId, Set<Assignment>> assignments() {
+ public Map<ReplicationGroupId, TokenizedAssignments> assignments() {
return groupAssignments;
}
@@ -151,7 +173,7 @@ public class AssignmentsTracker {
groupAssignments.remove(replicationGrpId);
} else {
Set<Assignment> newAssignments =
Assignments.fromBytes(entry.value()).nodes();
- groupAssignments.put(replicationGrpId, newAssignments);
+ groupAssignments.put(replicationGrpId, new
TokenizedAssignmentsImpl(newAssignments, entry.revision()));
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 3ed175c099..8d77d1fff4 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -126,6 +127,7 @@ public class LeaseUpdater {
* @param topologyService Topology service.
* @param leaseTracker Lease tracker.
* @param clockService Clock service.
+ * @param assignmentsTracker Assignments tracker.
*/
LeaseUpdater(
String nodeName,
@@ -133,7 +135,8 @@ public class LeaseUpdater {
MetaStorageManager msManager,
LogicalTopologyService topologyService,
LeaseTracker leaseTracker,
- ClockService clockService
+ ClockService clockService,
+ AssignmentsTracker assignmentsTracker
) {
this.nodeName = nodeName;
this.clusterService = clusterService;
@@ -142,7 +145,7 @@ public class LeaseUpdater {
this.clockService = clockService;
this.longLeaseInterval =
IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 120_000);
- this.assignmentsTracker = new AssignmentsTracker(msManager);
+ this.assignmentsTracker = assignmentsTracker;
this.topologyTracker = new TopologyTracker(topologyService);
this.updater = new Updater();
@@ -342,7 +345,7 @@ public class LeaseUpdater {
Map<ReplicationGroupId, Boolean> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new
HashMap<>(leasesCurrent.leaseByGroupId());
- Map<ReplicationGroupId, Set<Assignment>> currentAssignments =
assignmentsTracker.assignments();
+ Map<ReplicationGroupId, TokenizedAssignments> currentAssignments =
assignmentsTracker.assignments();
Set<ReplicationGroupId> currentAssignmentsReplicationGroupIds =
currentAssignments.keySet();
// Remove all expired leases that are no longer present in
assignments.
@@ -352,9 +355,9 @@ public class LeaseUpdater {
int currentAssignmentsSize = currentAssignments.size();
int activeLeasesCount = 0;
- for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry :
currentAssignments.entrySet()) {
+ for (Map.Entry<ReplicationGroupId, TokenizedAssignments> entry :
currentAssignments.entrySet()) {
ReplicationGroupId grpId = entry.getKey();
- Set<Assignment> assignments = entry.getValue();
+ Set<Assignment> assignments = entry.getValue().nodes();
Lease lease = leaseTracker.getLease(grpId);
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 6367931908..a798c7a72a 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -23,11 +23,15 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -36,6 +40,8 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftManager;
@@ -89,6 +95,10 @@ public class PlacementDriverManager implements
IgniteComponent {
/** Meta Storage manager. */
private final MetaStorageManager metastore;
+ private final AssignmentsTracker assignmentsTracker;
+
+ private final PlacementDriver placementDriver;
+
/**
* Constructor.
*
@@ -124,14 +134,19 @@ public class PlacementDriverManager implements
IgniteComponent {
this.leaseTracker = new LeaseTracker(metastore,
clusterService.topologyService(), clockService);
+ this.assignmentsTracker = new AssignmentsTracker(metastore);
+
this.leaseUpdater = new LeaseUpdater(
nodeName,
clusterService,
metastore,
logicalTopologyService,
leaseTracker,
- clockService
+ clockService,
+ assignmentsTracker
);
+
+ this.placementDriver = createPlacementDriver();
}
@Override
@@ -241,7 +256,7 @@ public class PlacementDriverManager implements
IgniteComponent {
/** Returns placement driver service. */
public PlacementDriver placementDriver() {
- return leaseTracker;
+ return placementDriver;
}
private void recoverInternalComponentsBusy() {
@@ -253,4 +268,50 @@ public class PlacementDriverManager implements
IgniteComponent {
leaseTracker.startTrack(recoveryRevision);
}
+
+ private PlacementDriver createPlacementDriver() {
+ return new PlacementDriver() {
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp timestamp
+ ) {
+ return assignmentsTracker.getAssignments(replicationGroupId,
timestamp);
+ }
+
+ @Override
+ public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+ ReplicationGroupId groupId,
+ HybridTimestamp timestamp,
+ long timeout,
+ TimeUnit unit) {
+ return leaseTracker.awaitPrimaryReplica(
+ groupId,
+ timestamp,
+ timeout,
+ unit
+ );
+ }
+
+ @Override
+ public CompletableFuture<ReplicaMeta>
getPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp
timestamp) {
+ return leaseTracker.getPrimaryReplica(replicationGroupId,
timestamp);
+ }
+
+ @Override
+ public CompletableFuture<Void>
previousPrimaryExpired(ReplicationGroupId replicationGroupId) {
+ return leaseTracker.previousPrimaryExpired(replicationGroupId);
+ }
+
+ @Override
+ public void listen(PrimaryReplicaEvent evt, EventListener<?
extends PrimaryReplicaEventParameters> listener) {
+ leaseTracker.listen(evt, listener);
+ }
+
+ @Override
+ public void removeListener(PrimaryReplicaEvent evt,
EventListener<? extends PrimaryReplicaEventParameters> listener) {
+ leaseTracker.removeListener(evt, listener);
+ }
+ };
+ }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 34699c2587..61e59f056a 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -54,7 +54,7 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitException;
import
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -70,7 +70,8 @@ import org.jetbrains.annotations.Nullable;
* Class tracks cluster leases in memory.
* At first, the class state recoveries from Vault, then updates on watch's
listener.
*/
-public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent,
PrimaryReplicaEventParameters> implements PlacementDriver {
+public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent,
PrimaryReplicaEventParameters> implements
+ LeasePlacementDriver {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(LeaseTracker.class);
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
index 6f470b9635..7335ec85e1 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -158,7 +158,8 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
metaStorageManager,
pdLogicalTopologyService,
leaseTracker,
- new TestClockService(new HybridClockImpl())
+ new TestClockService(new HybridClockImpl()),
+ new AssignmentsTracker(metaStorageManager)
);
}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 7caff8bb83..f96aa75251 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -143,7 +143,8 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
metaStorageManager,
topologyService,
leaseTracker,
- new TestClockService(new HybridClockImpl())
+ new TestClockService(new HybridClockImpl()),
+ new AssignmentsTracker(metaStorageManager)
);
leaseUpdater.init();
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 3cfc042eea..84f916c719 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
@@ -39,15 +40,21 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -72,6 +79,7 @@ import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
/** Tests to verify {@link LeaseTracker} implemented by {@link
PlacementDriver}. */
@@ -123,6 +131,19 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
GROUP_1
);
+ private static final String NODE_A_CONSITIENT_ID = "A";
+
+ private static final String NODE_B_CONSITIENT_ID = "B";
+
+ private static final String NODE_C_CONSITIENT_ID = "C";
+
+ private static final Set<Assignment> ASSIGNMENTS_A =
Set.of(forPeer(NODE_A_CONSITIENT_ID));
+
+ private static final Set<Assignment> ASSIGNMENTS_AB =
Set.of(forPeer(NODE_A_CONSITIENT_ID), forPeer(NODE_B_CONSITIENT_ID));
+
+ private static final Set<Assignment> ASSIGNMENTS_ABC =
+ Set.of(forPeer(NODE_A_CONSITIENT_ID),
forPeer(NODE_B_CONSITIENT_ID), forPeer(NODE_C_CONSITIENT_ID));
+
private static final int
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES = 1_000;
private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
@@ -133,7 +154,9 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
private final ClockService clockService = new TestClockService(new
HybridClockImpl());
- private LeaseTracker placementDriver;
+ private LeaseTracker leasePlacementDriver;
+
+ private AssignmentsTracker assignmentsPlacementDriver;
@Nullable
private ClusterNode leaseholder;
@@ -144,7 +167,9 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
revisionTracker = new PendingComparableValuesTracker<>(-1L);
- placementDriver = createPlacementDriver();
+ leasePlacementDriver = createPlacementDriver();
+
+ assignmentsPlacementDriver = createAssignmentsPlacementDriver();
metastore.registerRevisionUpdateListener(rev -> {
revisionTracker.update(rev, null);
@@ -158,7 +183,9 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
assertThat(recoveryFinishedFuture, willCompleteSuccessfully());
- placementDriver.startTrack(recoveryFinishedFuture.join());
+ leasePlacementDriver.startTrack(recoveryFinishedFuture.join());
+
+ assignmentsPlacementDriver.startTrack();
assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
@@ -168,7 +195,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@AfterEach
void tearDown() throws Exception {
closeAll(
- placementDriver == null ? null : placementDriver::stopTrack,
+ assignmentsPlacementDriver == null ? null :
assignmentsPlacementDriver::stopTrack,
+ leasePlacementDriver == null ? null :
leasePlacementDriver::stopTrack,
metastore == null ? null : () ->
assertThat(metastore.stopAsync(new ComponentContext()),
willCompleteSuccessfully())
);
}
@@ -187,7 +215,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testAwaitPrimaryReplicaInInterval() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
@@ -195,7 +223,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
publishLease(LEASE_FROM_1_TO_5_000);
// Await local node to be notified about new primary replica.
- assertTrue(waitForCondition(() ->
placementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
+ assertTrue(waitForCondition(() ->
leasePlacementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
// Assert that primary await future isn't completed yet because
corresponding await time 10 is greater than lease expiration time 5.
assertFalse(primaryReplicaFuture.isDone());
@@ -225,7 +253,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testAwaitPrimaryReplicaBeforeInterval() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
@@ -233,7 +261,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
publishLease(LEASE_FROM_1_TO_5_000);
// Await local node to be notified about new primary replica.
- assertTrue(waitForCondition(() ->
placementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
+ assertTrue(waitForCondition(() ->
leasePlacementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
// Assert that primary await future isn't completed yet because
corresponding await time 10 is greater than lease expiration time 5.
assertFalse(primaryReplicaFuture.isDone());
@@ -263,11 +291,11 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
publishLease(LEASE_FROM_1_TO_15_000);
// Await local node to be notified about new primary replica.
- assertTrue(waitForCondition(() ->
placementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_15_000),
+ assertTrue(waitForCondition(() ->
leasePlacementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_15_000),
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES));
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
// Assert that primary waiter is completed.
@@ -321,7 +349,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
leaseholder = null;
CompletableFuture<ReplicaMeta> primaryReplicaFuture =
- placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_1_000,
awaitPrimaryReplicaTimeoutMilliseconds, MILLISECONDS);
+ leasePlacementDriver.awaitPrimaryReplica(GROUP_1,
AWAIT_TIME_1_000, awaitPrimaryReplicaTimeoutMilliseconds, MILLISECONDS);
assertFalse(primaryReplicaFuture.isDone());
@@ -420,9 +448,9 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testTwoWaitersSameTime() throws Exception {
// Await primary replica for time 10 twice.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
- CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture1.isDone());
@@ -456,9 +484,9 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testTwoWaitersSameTimeFirstTimedOutSecondSucceed() throws
Exception {
// Await primary replica for time 10 twice.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
- CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture1.isDone());
@@ -496,7 +524,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testGetPrimaryReplica() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
@@ -507,17 +535,17 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
assertThat(primaryReplicaFuture, willSucceedFast());
// Assert that retrieved primary replica for same awaiting timestamp
as within await ones will be completed immediately.
- CompletableFuture<ReplicaMeta> retrievedPrimaryReplicaSameTime =
placementDriver.getPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> retrievedPrimaryReplicaSameTime =
leasePlacementDriver.getPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertTrue(retrievedPrimaryReplicaSameTime.isDone());
// Assert that retrieved primary replica for awaiting timestamp lt
lease expiration time will be completed immediately.
CompletableFuture<ReplicaMeta>
retrievedPrimaryReplicaTimeLtLeaseExpiration =
- placementDriver.getPrimaryReplica(GROUP_1, new
HybridTimestamp(14_000, 0));
+ leasePlacementDriver.getPrimaryReplica(GROUP_1, new
HybridTimestamp(14_000, 0));
assertTrue(retrievedPrimaryReplicaTimeLtLeaseExpiration.isDone());
// Assert that retrieved primary replica for awaiting timestamp gt
lease expiration time will be completed soon with null.
CompletableFuture<ReplicaMeta>
retrievedPrimaryReplicaTimeGtLeaseExpiration =
- placementDriver.getPrimaryReplica(GROUP_1, new
HybridTimestamp(16_000, 0));
+ leasePlacementDriver.getPrimaryReplica(GROUP_1, new
HybridTimestamp(16_000, 0));
assertThat(retrievedPrimaryReplicaTimeGtLeaseExpiration,
willSucceedFast());
assertNull(retrievedPrimaryReplicaTimeGtLeaseExpiration.get());
@@ -544,7 +572,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testGetPrimaryReplicaWithLessThanClockSkewDiff() {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
@@ -556,7 +584,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
// Assert that retrieved primary replica for timestamp less than
primaryReplica.expirationTimestamp - CLOCK_SKEW will return null.
assertThat(
- placementDriver.getPrimaryReplica(
+ leasePlacementDriver.getPrimaryReplica(
GROUP_1,
LEASE_FROM_1_TO_15_000.getExpirationTime()
.subtractPhysicalTime(clockService.maxClockSkewMillis())
@@ -638,6 +666,158 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
assertThat(eventParametersFuture, willTimeoutFast());
}
+ /**
+ * Ensure that AssignmentsPlacementDriver#getAssignments will await
cluster time and return stable assignments.
+ *
+ * <ol>
+ * <li>Request assignments for the timestamp < cluster time (MS safe
time).</li>
+ * <li>Ensure that assignments future is not completed.</li>
+ * <li>Publish stable, pending and planned assignments in order to
verify that pending and planed won't be retrieved by
+ * getAssignments.</li>
+ * <li>Ensure that assignments future was completed with published
stable assignments./li>
+ * </ol>
+ */
+ // Races are possible, so let's give it a better change to fail.
+ @RepeatedTest(100)
+ public void testGetAssignmentsAwaitsClusterTimeAndReturnAssignments()
throws Exception {
+ // Request assignments for the timestamp < cluster time (MS safe time).
+ CompletableFuture<TokenizedAssignments> assignmentsFuture =
assignmentsPlacementDriver.getAssignments(GROUP_1, clockService.now());
+
+ // Ensure that assignments future is not completed.
+ assertFalse(assignmentsFuture.isDone());
+
+ // Publish stable, pending and planned assignments in order to verify
that pending and planed won't be retrieved by getAssignments.
+ publishStableAssignments(ASSIGNMENTS_ABC);
+ publishPendingAssignments(ASSIGNMENTS_AB);
+ publishPlannedAssignments(ASSIGNMENTS_AB);
+
+ // Ensure that assignments future was completed with published stable
assignments.
+ assertThat(assignmentsFuture, willCompleteSuccessfully());
+ assertEquals(ASSIGNMENTS_ABC, assignmentsFuture.get().nodes());
+ }
+
+ /**
+ * Ensure that AssignmentsPlacementDriver#getAssignments will immediately
return stable assignments if clusterTimeToAwait has already
+ * passed.
+ *
+ * <ol>
+ * <li>Publish stable, pending and planned assignments in order to
verify that pending and planed won't be retrieved by
+ * getAssignments.</li>
+ * <li>Request assignments for already passed cluster time (MS safe
time).</li>
+ * <li>Ensure that assignments future is completed with published
stable assignments.</li>
+ * </ol>
+ */
+ // Races are possible, so let's give it a better change to fail.
+ @RepeatedTest(100)
+ public void
testGetAssignmentsImmediatelyReturnAssignmentsIfClusterTimeAlreadyPassed()
throws Exception {
+ HybridTimestamp requestTimestamp = clockService.now();
+
+ // Publish stable, pending and planned assignments in order to verify
that pending and planed won't be retrieved by getAssignments.
+ publishStableAssignments(ASSIGNMENTS_ABC);
+ publishPendingAssignments(ASSIGNMENTS_AB);
+ publishPlannedAssignments(ASSIGNMENTS_AB);
+
+ assertThat(metastore.clusterTime().waitFor(requestTimestamp),
willCompleteSuccessfully());
+
+ // Request assignments for already passed cluster time (MS safe time).
+ CompletableFuture<TokenizedAssignments> assignmentsFuture =
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+ // Ensure that assignments future is completed with published stable
assignments.
+ assertTrue(assignmentsFuture.isDone());
+ assertEquals(ASSIGNMENTS_ABC, assignmentsFuture.get().nodes());
+ }
+
+ /**
+ * Ensure that AssignmentsPlacementDriver#getAssignments will return the
future with null value if there are no stable assignments
+ * at the specified clusterAwaitTimestamp.
+ *
+ * <ol>
+ * <li>Request assignments for the timestamp < cluster time (MS safe
time).</li>
+ * <li>Ensure that assignments future is not completed.</li>
+ * <li>Publish **pending** assignments in order to increase cluster
time to the value greater then requested within
+ * getAssignments. Pay attention that not stable but pending
assignments were published. It's used in order to move cluster time.
+ * </li>
+ * <li>Ensure that assignments future was completed with null
value.</li>
+ * </ol>
+ */
+ // Races are possible, so let's give it a better change to fail.
+ @RepeatedTest(100)
+ public void testGetAssignmentsMayReturnFutureWithNullValue() throws
Exception {
+ // Request assignments for the timestamp < cluster time (MS safe time).
+ CompletableFuture<TokenizedAssignments> assignmentsFuture =
assignmentsPlacementDriver.getAssignments(GROUP_1, clockService.now());
+
+ // Ensure that assignments future is not completed.
+ assertFalse(assignmentsFuture.isDone());
+
+ // Publish **pending** assignments in order to increase cluster time
to the value greater then requested within getAssignments.
+ // Pay attention that not stable but pending assignments were
published. It's used in order to move cluster time.
+ publishPendingAssignments(ASSIGNMENTS_ABC);
+
+ // Ensure that assignments future was completed with null value.
+ assertThat(assignmentsFuture, willCompleteSuccessfully());
+ assertNull(assignmentsFuture.get());
+ }
+
+ /**
+ * Ensure that newest assignments are retrieved by
AssignmentsPlacementDriver#getAssignments.
+ *
+ * <ol>
+ * <li>Publish stable assignments.</li>
+ * <li>Request assignments for not requestTimestamp >=
ms.safeTime.</li>
+ * <li>Ensure that assignments future is not completed.</li>
+ * <li>Publish new stable assignments that besides publishing the
assignments will move ms.safeTime.</li>
+ * <li>Ensure that assignments retrieval future will complete
successfully with newest assignments and not initially published.</li>
+ * <li>Retrieve assignments one more time for the same request
timestamp.</li>
+ * <li>Ensure that assignments retrieval future will complete
successfully with same assignments and token.</li>
+ * <li>Publish yet another new stable assignments.</li>
+ * <li>Ensure that assignments retrieval future will complete
successfully with newest assignments and not previously retrieved.
+ * </li>
+ * <li>Ensure that assignments token was updated.</li>
+ * </ol>
+ */
+ // Races are possible, so let's give it a better change to fail.
+ @RepeatedTest(100)
+ public void
testGetAssignmentsReturnsNewestAssignmentsAssociatedWithSafeTimeGreaterThanRequested()
throws Exception {
+ // Publish stable assignments.
+ publishStableAssignments(ASSIGNMENTS_A);
+
+ // requestTimestamp >= clusterTime
+ HybridTimestamp requestTimestamp =
HybridTimestamp.hybridTimestamp(metastore.clusterTime().nowLong() + 1);
+
+ // Request assignments for not requestTimestamp >= ms.safeTime.
+ CompletableFuture<TokenizedAssignments> assignmentsFuture =
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+ // Ensure that assignments future is not completed.
+ assertFalse(assignmentsFuture.isDone());
+
+ // Publish new stable assignments that besides publishing the
assignments will move ms.safeTime.
+ publishStableAssignments(ASSIGNMENTS_AB);
+
+ // Ensure that assignments retrieval future will complete successfully
with newest assignments and not initially published.
+ assertThat(assignmentsFuture, willCompleteSuccessfully());
+ assertEquals(ASSIGNMENTS_AB, assignmentsFuture.get().nodes());
+ long assignmentsTokenAb = assignmentsFuture.get().token();
+
+ // Retrieve assignments one more time for the same request timestamp.
+ CompletableFuture<TokenizedAssignments> assignmentsFuture2 =
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+ // Ensure that assignments retrieval future will complete successfully
with same assignments and token.
+ assertEquals(ASSIGNMENTS_AB, assignmentsFuture.get().nodes());
+ assertEquals(assignmentsTokenAb, assignmentsFuture2.get().token());
+
+ // Publish yet another new stable assignments.
+ publishStableAssignments(ASSIGNMENTS_ABC);
+
+ // Request assignments for the same requestTimestamp
+ CompletableFuture<TokenizedAssignments> assignmentsFuture3 =
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+ // Ensure that assignments retrieval future will complete successfully
with newest assignments and not previously retrieved.
+ assertThat(assignmentsFuture3, willCompleteSuccessfully());
+ assertEquals(ASSIGNMENTS_ABC, assignmentsFuture3.get().nodes());
+ // Ensure that assignments token was updated.
+ assertNotEquals(assignmentsTokenAb, assignmentsFuture3.get().token());
+ }
+
private long publishLease(Lease lease) {
return publishLeases(lease);
}
@@ -658,6 +838,33 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
return expRev;
}
+ private void publishAssignments(ByteArray assignmentsKey, Set<Assignment>
assignments) {
+ long timestampBeforeUpdate = metastore.clusterTime().nowLong();
+
+ metastore.invoke(
+ Conditions.notExists(FAKE_KEY),
+ put(assignmentsKey, Assignments.toBytes(assignments)),
+ noop()
+ );
+
+ assertThat(
+
metastore.clusterTime().waitFor(HybridTimestamp.hybridTimestamp(timestampBeforeUpdate
+ 1)),
+ willCompleteSuccessfully()
+ );
+ }
+
+ private void publishStableAssignments(Set<Assignment> assignments) {
+ publishAssignments(RebalanceUtil.stablePartAssignmentsKey(GROUP_1),
assignments);
+ }
+
+ private void publishPendingAssignments(Set<Assignment> assignments) {
+ publishAssignments(RebalanceUtil.pendingPartAssignmentsKey(GROUP_1),
assignments);
+ }
+
+ private void publishPlannedAssignments(Set<Assignment> assignments) {
+ publishAssignments(RebalanceUtil.plannedPartAssignmentsKey(GROUP_1),
assignments);
+ }
+
private CompletableFuture<PrimaryReplicaEventParameters>
listenAnyReplicaBecomePrimaryEvent() {
return listenReplicaBecomePrimaryEvent(null);
}
@@ -669,7 +876,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
private CompletableFuture<PrimaryReplicaEventParameters>
listenReplicaBecomePrimaryEvent(@Nullable ReplicationGroupId groupId) {
var eventParametersFuture = new
CompletableFuture<PrimaryReplicaEventParameters>();
- placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> {
+ leasePlacementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> {
if (groupId == null || groupId.equals(parameters.groupId())) {
eventParametersFuture.complete(parameters);
}
@@ -701,4 +908,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
}
}, clockService);
}
+
+ private AssignmentsTracker createAssignmentsPlacementDriver() {
+ return new AssignmentsTracker(metastore);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index 4ad473891e..eca347e990 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.wrappers;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -62,4 +63,12 @@ abstract class DelegatingPlacementDriver implements
PlacementDriver {
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return delegate.previousPrimaryExpired(grpId);
}
+
+ @Override
+ public CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return delegate.getAssignments(replicationGroupId, clusterTimeToAwait);
+ }
}