This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f98f98e91b IGNITE-22467 Add getAssignments method to the 
PlacementDriver (#3947)
f98f98e91b is described below

commit f98f98e91be5949dd018d5442af1f503e3596211
Author: Alexander Lapin <[email protected]>
AuthorDate: Fri Jun 21 19:19:36 2024 +0300

    IGNITE-22467 Add getAssignments method to the PlacementDriver (#3947)
---
 .../internal/affinity/TokenizedAssignments.java}   |  25 +-
 .../affinity/TokenizedAssignmentsImpl.java         |  62 +++++
 modules/client-handler/build.gradle                |   1 +
 .../ignite/client/handler/FakePlacementDriver.java |  12 +-
 modules/index/build.gradle                         |   1 +
 .../ignite/internal/index/TestPlacementDriver.java |  11 +
 .../replicator/utils/TestPlacementDriver.java      |  10 +
 modules/placement-driver-api/build.gradle          |   2 +
 .../AssignmentsPlacementDriver.java                |  43 ++++
 ...cementDriver.java => LeasePlacementDriver.java} |  16 +-
 .../internal/placementdriver/PlacementDriver.java  |  52 +----
 .../placementdriver/TestPlacementDriver.java       |   9 +
 .../placementdriver/AssignmentsTracker.java        |  34 ++-
 .../internal/placementdriver/LeaseUpdater.java     |  13 +-
 .../placementdriver/PlacementDriverManager.java    |  65 +++++-
 .../placementdriver/leases/LeaseTracker.java       |   5 +-
 .../placementdriver/LeaseNegotiationTest.java      |   3 +-
 .../internal/placementdriver/LeaseUpdaterTest.java |   3 +-
 .../placementdriver/PlacementDriverTest.java       | 255 +++++++++++++++++++--
 .../wrappers/DelegatingPlacementDriver.java        |   9 +
 20 files changed, 519 insertions(+), 112 deletions(-)

diff --git a/modules/placement-driver-api/build.gradle 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignments.java
similarity index 54%
copy from modules/placement-driver-api/build.gradle
copy to 
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignments.java
index 4a6385f667..887f841d1f 100644
--- a/modules/placement-driver-api/build.gradle
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignments.java
@@ -15,21 +15,18 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
-apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+package org.apache.ignite.internal.affinity;
 
-dependencies {
-    annotationProcessor project(":ignite-network-annotation-processor")
+import java.io.Serializable;
+import java.util.Set;
 
-    implementation project(':ignite-core')
-    implementation project(':ignite-network-api')
-    implementation libs.jetbrains.annotations
+/**
+ * Set of nodes along with associated token that is guaranteed to be changed 
if the set was changed.
+ */
+public interface TokenizedAssignments extends Serializable {
+    /** Returns a set of nodes, represented by this assignments instance. */
+    Set<Assignment> nodes();
 
-    testFixturesImplementation project(':ignite-core')
-    testFixturesImplementation project(':ignite-network-api')
-    testFixturesImplementation libs.jetbrains.annotations
+    /** Returns a token associated with given assignments that is guaranteed 
to be changed if assignments were changed. */
+    long token();
 }
-
-description = 'ignite-placement-driver-api'
diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignmentsImpl.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignmentsImpl.java
new file mode 100644
index 0000000000..d3d7db5d82
--- /dev/null
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/TokenizedAssignmentsImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.Set;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Set of nodes along with associated token that is guaranteed to be changed 
if the set was changed.
+ */
+public class TokenizedAssignmentsImpl implements TokenizedAssignments {
+    private static final long serialVersionUID = -6960630542063056327L;
+
+    @IgniteToStringInclude
+    private final Set<Assignment> nodes;
+
+    private final long token;
+
+    /**
+     * The constructor.
+     *
+     * @param nodes Set of nodes.
+     * @param token Token.
+     */
+    public TokenizedAssignmentsImpl(Set<Assignment> nodes, long token) {
+        this.nodes = nodes;
+        this.token = token;
+    }
+
+    @Override
+    public Set<Assignment> nodes() {
+        return unmodifiableSet(nodes);
+    }
+
+    @Override
+    public long token() {
+        return token;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git a/modules/client-handler/build.gradle 
b/modules/client-handler/build.gradle
index ad81fcb764..ec68884b44 100644
--- a/modules/client-handler/build.gradle
+++ b/modules/client-handler/build.gradle
@@ -90,6 +90,7 @@ dependencies {
     testFixturesImplementation project(':ignite-placement-driver-api')
     testFixturesImplementation project(':ignite-catalog')
     testFixturesImplementation project(':ignite-security-api')
+    testFixturesImplementation project(':ignite-affinity')
     testFixturesImplementation libs.mockito.junit
 }
 
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 5853de37f1..62c0fc17e8 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client.handler;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -89,7 +91,7 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         TablePartitionId id = (TablePartitionId) groupId;
 
         return returnError
-                ? CompletableFuture.failedFuture(new 
RuntimeException("FakePlacementDriver expected error"))
+                ? failedFuture(new RuntimeException("FakePlacementDriver 
expected error"))
                 : 
CompletableFuture.completedFuture(primaryReplicas.get(id.partitionId()));
     }
 
@@ -103,6 +105,14 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return nullCompletedFuture();
     }
 
+    @Override
+    public CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
+    }
+
     private static ReplicaMeta getReplicaMeta(String leaseholder, long 
leaseStartTime) {
         //noinspection serial
         return new ReplicaMeta() {
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 79e031cfa3..a2b0a66c47 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -40,6 +40,7 @@ dependencies {
     implementation project(':ignite-cluster-management')
     implementation project(':ignite-low-watermark')
     implementation project(':ignite-partition-replicator')
+    implementation project(':ignite-affinity')
     implementation libs.jetbrains.annotations
 
     testImplementation(testFixtures(project(':ignite-configuration')))
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index fe207e7588..fb81f9cee2 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -54,6 +57,14 @@ class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
+    }
+
     CompletableFuture<Void> setPrimaryReplicaMeta(
             long causalityToken,
             TablePartitionId replicaId,
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index e5c6615bf4..51b207f66b 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.partition.replicator.utils;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -64,6 +66,14 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return nullCompletedFuture();
     }
 
+    @Override
+    public CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
+    }
+
     private CompletableFuture<ReplicaMeta> getPrimaryReplicaMeta() {
         if (primary == null) {
             throw new IllegalStateException("Primary replica is not defined in 
test PlacementDriver");
diff --git a/modules/placement-driver-api/build.gradle 
b/modules/placement-driver-api/build.gradle
index 4a6385f667..a446f980f1 100644
--- a/modules/placement-driver-api/build.gradle
+++ b/modules/placement-driver-api/build.gradle
@@ -25,10 +25,12 @@ dependencies {
 
     implementation project(':ignite-core')
     implementation project(':ignite-network-api')
+    implementation project(':ignite-affinity')
     implementation libs.jetbrains.annotations
 
     testFixturesImplementation project(':ignite-core')
     testFixturesImplementation project(':ignite-network-api')
+    testFixturesImplementation project(':ignite-affinity')
     testFixturesImplementation libs.jetbrains.annotations
 }
 
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
new file mode 100644
index 0000000000..641ab117ec
--- /dev/null
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Service that provides an ability to await and retrieve assignments for 
replication groups.
+ * It's not guaranteed that retrieved assignments are ready to process 
requests, meaning that corresponding replicas
+ * may not be started yet, or may be stopped at the next tick. On the stable 
replication group, assignments however
+ * may be interpreted as replica hosts to process the requests.
+ */
+public interface AssignmentsPlacementDriver {
+
+    /**
+     * Returns the future with either newest available tokenized assignments 
for the specified replication group id or {@code null} if
+     * there are no such assignments. The future will be completed after 
clusterTime (meta storage safe time) will become greater or equal
+     * to the clusterTimeToAwait parameter.
+     *
+     * @param replicationGroupId Replication group Id.
+     * @param clusterTimeToAwait Cluster time to await.
+     * @return Tokenized assignments.
+     */
+    CompletableFuture<TokenizedAssignments> getAssignments(ReplicationGroupId 
replicationGroupId, HybridTimestamp clusterTimeToAwait);
+}
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
similarity index 85%
copy from 
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
copy to 
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
index f06e28a8b3..99f6d41822 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
@@ -35,14 +35,14 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
  * replication group.</p>
  */
 // TODO: https://issues.apache.org/jira/browse/IGNITE-20646 Consider using 
CLOCK_SKEW unaware await/getPrimaryReplica()
-public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent, 
PrimaryReplicaEventParameters> {
+public interface LeasePlacementDriver extends 
EventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters> {
     /**
      * Returns a future for the primary replica for the specified replication 
group whose expiration time (the right border of the
-     * corresponding lease interval) is greater than or equal to the 
(timestamp passed as a parameter - CLOCK_SKEW).
-     * Please pay attention that there are no restriction on the lease start 
time (left border),
-     * it can either be less or greater than or equal to proposed timestamp.
-     * Given method will await for an appropriate primary replica appearance 
if there's no already existing one. If the current lease
-     * is held by a node that is already not in a cluster, the future will be 
completed after the lease is transferred to another node.
+     * corresponding lease interval) is greater than or equal to the 
(timestamp passed as a parameter - CLOCK_SKEW). Please pay attention
+     * that there are no restriction on the lease start time (left border), it 
can either be less or greater than or equal to proposed
+     * timestamp. Given method will await for an appropriate primary replica 
appearance if there's no already existing one. If the current
+     * lease is held by a node that is already not in a cluster, the future 
will be completed after the lease is transferred to another
+     * node.
      *
      * @param groupId Replication group id.
      * @param timestamp CLOCK_SKEW aware timestamp reference value.
@@ -74,8 +74,8 @@ public interface PlacementDriver extends 
EventProducer<PrimaryReplicaEvent, Prim
      * Returns a future that completes when all expiration event {@link 
PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
      * primary complete.
      *
-     * @param grpId Replication group id.
+     * @param replicationGroupId Replication group id.
      * @return Future.
      */
-    CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId);
+    CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
replicationGroupId);
 }
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index f06e28a8b3..b2d6425e17 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -17,16 +17,11 @@
 
 package org.apache.ignite.internal.placementdriver;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.event.EventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
-import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 
 /**
- * Service that provides an ability to await and retrieve primary replicas for 
replication groups.
+ * Service that provides an ability to await and retrieve primary replicas and 
assignments for replication groups.
  *
  * <p>Notes: If during recovery, the component needs to perform actions 
depending on whether the primary replica for some replication group
  * is a local node, then it needs to use {@link 
#getPrimaryReplica(ReplicationGroupId, HybridTimestamp)}. Then compare the 
local node with
@@ -34,48 +29,5 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
  * {@link ReplicaMeta#getExpirationTime()}. And only then can we consider that 
the local node is the primary replica for the requested
  * replication group.</p>
  */
-// TODO: https://issues.apache.org/jira/browse/IGNITE-20646 Consider using 
CLOCK_SKEW unaware await/getPrimaryReplica()
-public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent, 
PrimaryReplicaEventParameters> {
-    /**
-     * Returns a future for the primary replica for the specified replication 
group whose expiration time (the right border of the
-     * corresponding lease interval) is greater than or equal to the 
(timestamp passed as a parameter - CLOCK_SKEW).
-     * Please pay attention that there are no restriction on the lease start 
time (left border),
-     * it can either be less or greater than or equal to proposed timestamp.
-     * Given method will await for an appropriate primary replica appearance 
if there's no already existing one. If the current lease
-     * is held by a node that is already not in a cluster, the future will be 
completed after the lease is transferred to another node.
-     *
-     * @param groupId Replication group id.
-     * @param timestamp CLOCK_SKEW aware timestamp reference value.
-     * @param timeout How long to wait before completing exceptionally with a 
TimeoutException, in units of unit.
-     * @param unit A TimeUnit determining how to interpret the timeout 
parameter.
-     * @return Primary replica future.
-     * @throws PrimaryReplicaAwaitTimeoutException If primary replica await 
timed out.
-     * @throws PrimaryReplicaAwaitException If primary replica await failed 
with any other reason except timeout.
-     */
-    CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
-            ReplicationGroupId groupId,
-            HybridTimestamp timestamp,
-            long timeout,
-            TimeUnit unit
-    );
-
-    /**
-     * Same as {@link #awaitPrimaryReplica(ReplicationGroupId, 
HybridTimestamp, long, TimeUnit)} despite the fact that given method await
-     * logic is bounded. It will wait for a primary replica for a reasonable 
period of time, and complete a future with null if a matching
-     * lease isn't found. Generally speaking reasonable here means enough for 
distribution across cluster nodes.
-     *
-     * @param replicationGroupId Replication group id.
-     * @param timestamp CLOCK_SKEW aware timestamp reference value.
-     * @return Primary replica future.
-     */
-    CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId 
replicationGroupId, HybridTimestamp timestamp);
-
-    /**
-     * Returns a future that completes when all expiration event {@link 
PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
-     * primary complete.
-     *
-     * @param grpId Replication group id.
-     * @return Future.
-     */
-    CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId);
+public interface PlacementDriver extends LeasePlacementDriver, 
AssignmentsPlacementDriver {
 }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 440cbbb309..3392cef934 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -83,6 +84,14 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return super.fireEvent(event, parameters);
     }
 
+    @Override
+    public CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
+    }
+
     private CompletableFuture<ReplicaMeta> getReplicaMetaFuture() {
         try {
             return completedFuture(primaryReplicaSupplier.get());
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 11e4da4de6..b71a9f7f4c 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
 
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.StringUtils.incrementLastChar;
 
 import java.nio.charset.StandardCharsets;
@@ -29,6 +30,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -40,23 +44,30 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
 /**
  * The class tracks assignment of all replication groups.
  */
-public class AssignmentsTracker {
+public class AssignmentsTracker implements AssignmentsPlacementDriver {
     /** Ignite logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(AssignmentsTracker.class);
 
+    // TODO Not sure whether it should be instantiated here or propagated from 
PDM.
+    // TODO Use it on stop, etc.
+    /** Busy lock to linearize service public API calls and service stop. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     /** Meta storage manager. */
     private final MetaStorageManager msManager;
 
     /** Map replication group id to assignment nodes. */
-    private final Map<ReplicationGroupId, Set<Assignment>> groupAssignments;
+    private final Map<ReplicationGroupId, TokenizedAssignments> 
groupAssignments;
 
     /** Assignment Meta storage watch listener. */
     private final AssignmentsListener assignmentsListener;
 
+
     /**
      * The constructor.
      *
@@ -96,9 +107,9 @@ public class AssignmentsTracker {
 
                     TablePartitionId grpId = 
TablePartitionId.fromString(strKey);
 
-                    Set<Assignment> assignments = 
Assignments.fromBytes(entry.value()).nodes();
+                    Set<Assignment> assignmentNodes = 
Assignments.fromBytes(entry.value()).nodes();
 
-                    groupAssignments.put(grpId, assignments);
+                    groupAssignments.put(grpId, new 
TokenizedAssignmentsImpl(assignmentNodes, entry.revision()));
                 }
             }
         }).whenComplete((res, ex) -> {
@@ -117,12 +128,23 @@ public class AssignmentsTracker {
         msManager.unregisterWatch(assignmentsListener);
     }
 
+    @Override
+    public CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return msManager
+                .clusterTime()
+                .waitFor(clusterTimeToAwait)
+                .thenApply(ignored -> inBusyLock(busyLock, () -> 
assignments().get(replicationGroupId)));
+    }
+
     /**
      * Gets assignments.
      *
      * @return Map replication group id to its assignment.
      */
-    public Map<ReplicationGroupId, Set<Assignment>> assignments() {
+    public Map<ReplicationGroupId, TokenizedAssignments> assignments() {
         return groupAssignments;
     }
 
@@ -151,7 +173,7 @@ public class AssignmentsTracker {
                     groupAssignments.remove(replicationGrpId);
                 } else {
                     Set<Assignment> newAssignments = 
Assignments.fromBytes(entry.value()).nodes();
-                    groupAssignments.put(replicationGrpId, newAssignments);
+                    groupAssignments.put(replicationGrpId, new 
TokenizedAssignmentsImpl(newAssignments, entry.revision()));
                 }
             }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 3ed175c099..8d77d1fff4 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -126,6 +127,7 @@ public class LeaseUpdater {
      * @param topologyService Topology service.
      * @param leaseTracker Lease tracker.
      * @param clockService Clock service.
+     * @param assignmentsTracker Assignments tracker.
      */
     LeaseUpdater(
             String nodeName,
@@ -133,7 +135,8 @@ public class LeaseUpdater {
             MetaStorageManager msManager,
             LogicalTopologyService topologyService,
             LeaseTracker leaseTracker,
-            ClockService clockService
+            ClockService clockService,
+            AssignmentsTracker assignmentsTracker
     ) {
         this.nodeName = nodeName;
         this.clusterService = clusterService;
@@ -142,7 +145,7 @@ public class LeaseUpdater {
         this.clockService = clockService;
 
         this.longLeaseInterval = 
IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 120_000);
-        this.assignmentsTracker = new AssignmentsTracker(msManager);
+        this.assignmentsTracker = assignmentsTracker;
         this.topologyTracker = new TopologyTracker(topologyService);
         this.updater = new Updater();
 
@@ -342,7 +345,7 @@ public class LeaseUpdater {
             Map<ReplicationGroupId, Boolean> toBeNegotiated = new HashMap<>();
             Map<ReplicationGroupId, Lease> renewedLeases = new 
HashMap<>(leasesCurrent.leaseByGroupId());
 
-            Map<ReplicationGroupId, Set<Assignment>> currentAssignments = 
assignmentsTracker.assignments();
+            Map<ReplicationGroupId, TokenizedAssignments> currentAssignments = 
assignmentsTracker.assignments();
             Set<ReplicationGroupId> currentAssignmentsReplicationGroupIds = 
currentAssignments.keySet();
 
             // Remove all expired leases that are no longer present in 
assignments.
@@ -352,9 +355,9 @@ public class LeaseUpdater {
             int currentAssignmentsSize = currentAssignments.size();
             int activeLeasesCount = 0;
 
-            for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
currentAssignments.entrySet()) {
+            for (Map.Entry<ReplicationGroupId, TokenizedAssignments> entry : 
currentAssignments.entrySet()) {
                 ReplicationGroupId grpId = entry.getKey();
-                Set<Assignment> assignments = entry.getValue();
+                Set<Assignment> assignments = entry.getValue().nodes();
 
                 Lease lease = leaseTracker.getLease(grpId);
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 6367931908..a798c7a72a 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -23,11 +23,15 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -36,6 +40,8 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftManager;
@@ -89,6 +95,10 @@ public class PlacementDriverManager implements 
IgniteComponent {
     /** Meta Storage manager. */
     private final MetaStorageManager metastore;
 
+    private final AssignmentsTracker assignmentsTracker;
+
+    private final PlacementDriver placementDriver;
+
     /**
      * Constructor.
      *
@@ -124,14 +134,19 @@ public class PlacementDriverManager implements 
IgniteComponent {
 
         this.leaseTracker = new LeaseTracker(metastore, 
clusterService.topologyService(), clockService);
 
+        this.assignmentsTracker = new AssignmentsTracker(metastore);
+
         this.leaseUpdater = new LeaseUpdater(
                 nodeName,
                 clusterService,
                 metastore,
                 logicalTopologyService,
                 leaseTracker,
-                clockService
+                clockService,
+                assignmentsTracker
         );
+
+        this.placementDriver = createPlacementDriver();
     }
 
     @Override
@@ -241,7 +256,7 @@ public class PlacementDriverManager implements 
IgniteComponent {
 
     /** Returns placement driver service. */
     public PlacementDriver placementDriver() {
-        return leaseTracker;
+        return placementDriver;
     }
 
     private void recoverInternalComponentsBusy() {
@@ -253,4 +268,50 @@ public class PlacementDriverManager implements 
IgniteComponent {
 
         leaseTracker.startTrack(recoveryRevision);
     }
+
+    private PlacementDriver createPlacementDriver() {
+        return new PlacementDriver() {
+            @Override
+            public CompletableFuture<TokenizedAssignments> getAssignments(
+                    ReplicationGroupId replicationGroupId,
+                    HybridTimestamp timestamp
+            ) {
+                return assignmentsTracker.getAssignments(replicationGroupId, 
timestamp);
+            }
+
+            @Override
+            public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+                    ReplicationGroupId groupId,
+                    HybridTimestamp timestamp,
+                    long timeout,
+                    TimeUnit unit) {
+                return leaseTracker.awaitPrimaryReplica(
+                        groupId,
+                        timestamp,
+                        timeout,
+                        unit
+                );
+            }
+
+            @Override
+            public CompletableFuture<ReplicaMeta> 
getPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp 
timestamp) {
+                return leaseTracker.getPrimaryReplica(replicationGroupId, 
timestamp);
+            }
+
+            @Override
+            public CompletableFuture<Void> 
previousPrimaryExpired(ReplicationGroupId replicationGroupId) {
+                return leaseTracker.previousPrimaryExpired(replicationGroupId);
+            }
+
+            @Override
+            public void listen(PrimaryReplicaEvent evt, EventListener<? 
extends PrimaryReplicaEventParameters> listener) {
+                leaseTracker.listen(evt, listener);
+            }
+
+            @Override
+            public void removeListener(PrimaryReplicaEvent evt, 
EventListener<? extends PrimaryReplicaEventParameters> listener) {
+                leaseTracker.removeListener(evt, listener);
+            }
+        };
+    }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 34699c2587..61e59f056a 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -54,7 +54,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitException;
 import 
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -70,7 +70,8 @@ import org.jetbrains.annotations.Nullable;
  * Class tracks cluster leases in memory.
  * At first, the class state recoveries from Vault, then updates on watch's 
listener.
  */
-public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, 
PrimaryReplicaEventParameters> implements PlacementDriver {
+public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, 
PrimaryReplicaEventParameters> implements
+        LeasePlacementDriver {
     /** Ignite logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(LeaseTracker.class);
 
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
index 6f470b9635..7335ec85e1 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -158,7 +158,8 @@ public class LeaseNegotiationTest extends 
BaseIgniteAbstractTest {
                 metaStorageManager,
                 pdLogicalTopologyService,
                 leaseTracker,
-                new TestClockService(new HybridClockImpl())
+                new TestClockService(new HybridClockImpl()),
+                new AssignmentsTracker(metaStorageManager)
         );
     }
 
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 7caff8bb83..f96aa75251 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -143,7 +143,8 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
                 metaStorageManager,
                 topologyService,
                 leaseTracker,
-                new TestClockService(new HybridClockImpl())
+                new TestClockService(new HybridClockImpl()),
+                new AssignmentsTracker(metaStorageManager)
         );
 
         leaseUpdater.init();
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 3cfc042eea..84f916c719 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
@@ -39,15 +40,21 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -72,6 +79,7 @@ import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 /** Tests to verify {@link LeaseTracker} implemented by {@link 
PlacementDriver}. */
@@ -123,6 +131,19 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
             GROUP_1
     );
 
+    private static final String NODE_A_CONSITIENT_ID = "A";
+
+    private static final String NODE_B_CONSITIENT_ID = "B";
+
+    private static final String NODE_C_CONSITIENT_ID = "C";
+
+    private static final Set<Assignment> ASSIGNMENTS_A = 
Set.of(forPeer(NODE_A_CONSITIENT_ID));
+
+    private static final Set<Assignment> ASSIGNMENTS_AB = 
Set.of(forPeer(NODE_A_CONSITIENT_ID), forPeer(NODE_B_CONSITIENT_ID));
+
+    private static final Set<Assignment> ASSIGNMENTS_ABC =
+            Set.of(forPeer(NODE_A_CONSITIENT_ID), 
forPeer(NODE_B_CONSITIENT_ID), forPeer(NODE_C_CONSITIENT_ID));
+
     private static final int 
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES = 1_000;
 
     private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
@@ -133,7 +154,9 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
 
     private final ClockService clockService = new TestClockService(new 
HybridClockImpl());
 
-    private LeaseTracker placementDriver;
+    private LeaseTracker leasePlacementDriver;
+
+    private AssignmentsTracker assignmentsPlacementDriver;
 
     @Nullable
     private ClusterNode leaseholder;
@@ -144,7 +167,9 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
 
         revisionTracker = new PendingComparableValuesTracker<>(-1L);
 
-        placementDriver = createPlacementDriver();
+        leasePlacementDriver = createPlacementDriver();
+
+        assignmentsPlacementDriver = createAssignmentsPlacementDriver();
 
         metastore.registerRevisionUpdateListener(rev -> {
             revisionTracker.update(rev, null);
@@ -158,7 +183,9 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
 
         assertThat(recoveryFinishedFuture, willCompleteSuccessfully());
 
-        placementDriver.startTrack(recoveryFinishedFuture.join());
+        leasePlacementDriver.startTrack(recoveryFinishedFuture.join());
+
+        assignmentsPlacementDriver.startTrack();
 
         assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
 
@@ -168,7 +195,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @AfterEach
     void tearDown() throws Exception {
         closeAll(
-                placementDriver == null ? null : placementDriver::stopTrack,
+                assignmentsPlacementDriver == null ? null : 
assignmentsPlacementDriver::stopTrack,
+                leasePlacementDriver == null ? null : 
leasePlacementDriver::stopTrack,
                 metastore == null ? null : () -> 
assertThat(metastore.stopAsync(new ComponentContext()), 
willCompleteSuccessfully())
         );
     }
@@ -187,7 +215,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testAwaitPrimaryReplicaInInterval() throws Exception {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
@@ -195,7 +223,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         publishLease(LEASE_FROM_1_TO_5_000);
 
         // Await local node to be notified about new primary replica.
-        assertTrue(waitForCondition(() -> 
placementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
+        assertTrue(waitForCondition(() -> 
leasePlacementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
 
         // Assert that primary await future isn't completed yet because 
corresponding await time 10 is greater than lease expiration time 5.
         assertFalse(primaryReplicaFuture.isDone());
@@ -225,7 +253,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testAwaitPrimaryReplicaBeforeInterval() throws Exception {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
@@ -233,7 +261,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         publishLease(LEASE_FROM_1_TO_5_000);
 
         // Await local node to be notified about new primary replica.
-        assertTrue(waitForCondition(() -> 
placementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
+        assertTrue(waitForCondition(() -> 
leasePlacementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_5_000), 1_000));
 
         // Assert that primary await future isn't completed yet because 
corresponding await time 10 is greater than lease expiration time 5.
         assertFalse(primaryReplicaFuture.isDone());
@@ -263,11 +291,11 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         publishLease(LEASE_FROM_1_TO_15_000);
 
         // Await local node to be notified about new primary replica.
-        assertTrue(waitForCondition(() -> 
placementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_15_000),
+        assertTrue(waitForCondition(() -> 
leasePlacementDriver.getLease(GROUP_1).equals(LEASE_FROM_1_TO_15_000),
                 
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES));
 
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
 
         // Assert that primary waiter is completed.
@@ -321,7 +349,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         leaseholder = null;
 
         CompletableFuture<ReplicaMeta> primaryReplicaFuture =
-                placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_1_000, 
awaitPrimaryReplicaTimeoutMilliseconds, MILLISECONDS);
+                leasePlacementDriver.awaitPrimaryReplica(GROUP_1, 
AWAIT_TIME_1_000, awaitPrimaryReplicaTimeoutMilliseconds, MILLISECONDS);
 
         assertFalse(primaryReplicaFuture.isDone());
 
@@ -420,9 +448,9 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testTwoWaitersSameTime() throws Exception {
         // Await primary replica for time 10 twice.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
 
         assertFalse(primaryReplicaFuture1.isDone());
@@ -456,9 +484,9 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testTwoWaitersSameTimeFirstTimedOutSecondSucceed() throws 
Exception {
         // Await primary replica for time 10 twice.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
 
         assertFalse(primaryReplicaFuture1.isDone());
@@ -496,7 +524,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testGetPrimaryReplica() throws Exception {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
@@ -507,17 +535,17 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         assertThat(primaryReplicaFuture, willSucceedFast());
 
         // Assert that retrieved primary replica for same awaiting timestamp 
as within await ones will be completed immediately.
-        CompletableFuture<ReplicaMeta> retrievedPrimaryReplicaSameTime = 
placementDriver.getPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> retrievedPrimaryReplicaSameTime = 
leasePlacementDriver.getPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
         assertTrue(retrievedPrimaryReplicaSameTime.isDone());
 
         // Assert that retrieved primary replica for awaiting timestamp lt 
lease expiration time will be completed immediately.
         CompletableFuture<ReplicaMeta> 
retrievedPrimaryReplicaTimeLtLeaseExpiration =
-                placementDriver.getPrimaryReplica(GROUP_1, new 
HybridTimestamp(14_000, 0));
+                leasePlacementDriver.getPrimaryReplica(GROUP_1, new 
HybridTimestamp(14_000, 0));
         assertTrue(retrievedPrimaryReplicaTimeLtLeaseExpiration.isDone());
 
         // Assert that retrieved primary replica for awaiting timestamp gt 
lease expiration time will be completed soon with null.
         CompletableFuture<ReplicaMeta> 
retrievedPrimaryReplicaTimeGtLeaseExpiration =
-                placementDriver.getPrimaryReplica(GROUP_1, new 
HybridTimestamp(16_000, 0));
+                leasePlacementDriver.getPrimaryReplica(GROUP_1, new 
HybridTimestamp(16_000, 0));
 
         assertThat(retrievedPrimaryReplicaTimeGtLeaseExpiration, 
willSucceedFast());
         assertNull(retrievedPrimaryReplicaTimeGtLeaseExpiration.get());
@@ -544,7 +572,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testGetPrimaryReplicaWithLessThanClockSkewDiff() {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
leasePlacementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
                 AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
@@ -556,7 +584,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
 
         // Assert that retrieved primary replica for timestamp less than 
primaryReplica.expirationTimestamp - CLOCK_SKEW will return null.
         assertThat(
-                placementDriver.getPrimaryReplica(
+                leasePlacementDriver.getPrimaryReplica(
                         GROUP_1,
                         LEASE_FROM_1_TO_15_000.getExpirationTime()
                                 
.subtractPhysicalTime(clockService.maxClockSkewMillis())
@@ -638,6 +666,158 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         assertThat(eventParametersFuture, willTimeoutFast());
     }
 
+    /**
+     * Ensure that AssignmentsPlacementDriver#getAssignments will await 
cluster time and return stable assignments.
+     *
+     * <ol>
+     *     <li>Request assignments for the timestamp < cluster time (MS safe 
time).</li>
+     *     <li>Ensure that assignments future is not completed.</li>
+     *     <li>Publish stable, pending and planned assignments in order to 
verify that pending and planed won't be retrieved by
+     *     getAssignments.</li>
+     *     <li>Ensure that assignments future was completed with published 
stable assignments./li>
+     * </ol>
+     */
+    // Races are possible, so let's give it a better change to fail.
+    @RepeatedTest(100)
+    public void testGetAssignmentsAwaitsClusterTimeAndReturnAssignments() 
throws Exception {
+        // Request assignments for the timestamp < cluster time (MS safe time).
+        CompletableFuture<TokenizedAssignments> assignmentsFuture = 
assignmentsPlacementDriver.getAssignments(GROUP_1, clockService.now());
+
+        // Ensure that assignments future is not completed.
+        assertFalse(assignmentsFuture.isDone());
+
+        // Publish stable, pending and planned assignments in order to verify 
that pending and planed won't be retrieved by getAssignments.
+        publishStableAssignments(ASSIGNMENTS_ABC);
+        publishPendingAssignments(ASSIGNMENTS_AB);
+        publishPlannedAssignments(ASSIGNMENTS_AB);
+
+        // Ensure that assignments future was completed with published stable 
assignments.
+        assertThat(assignmentsFuture, willCompleteSuccessfully());
+        assertEquals(ASSIGNMENTS_ABC, assignmentsFuture.get().nodes());
+    }
+
+    /**
+     * Ensure that AssignmentsPlacementDriver#getAssignments will immediately 
return stable assignments if clusterTimeToAwait has already
+     * passed.
+     *
+     * <ol>
+     *     <li>Publish stable, pending and planned assignments in order to 
verify that pending and planed won't be retrieved by
+     *     getAssignments.</li>
+     *     <li>Request assignments for already passed cluster time (MS safe 
time).</li>
+     *     <li>Ensure that assignments future is completed with published 
stable assignments.</li>
+     * </ol>
+     */
+    // Races are possible, so let's give it a better change to fail.
+    @RepeatedTest(100)
+    public void 
testGetAssignmentsImmediatelyReturnAssignmentsIfClusterTimeAlreadyPassed() 
throws Exception {
+        HybridTimestamp requestTimestamp = clockService.now();
+
+        // Publish stable, pending and planned assignments in order to verify 
that pending and planed won't be retrieved by getAssignments.
+        publishStableAssignments(ASSIGNMENTS_ABC);
+        publishPendingAssignments(ASSIGNMENTS_AB);
+        publishPlannedAssignments(ASSIGNMENTS_AB);
+
+        assertThat(metastore.clusterTime().waitFor(requestTimestamp), 
willCompleteSuccessfully());
+
+        // Request assignments for already passed cluster time (MS safe time).
+        CompletableFuture<TokenizedAssignments> assignmentsFuture = 
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+        // Ensure that assignments future is completed with published stable 
assignments.
+        assertTrue(assignmentsFuture.isDone());
+        assertEquals(ASSIGNMENTS_ABC, assignmentsFuture.get().nodes());
+    }
+
+    /**
+     * Ensure that AssignmentsPlacementDriver#getAssignments will return the 
future with null value if there are no stable assignments
+     * at the specified clusterAwaitTimestamp.
+     *
+     * <ol>
+     *     <li>Request assignments for the timestamp < cluster time (MS safe 
time).</li>
+     *     <li>Ensure that assignments future is not completed.</li>
+     *     <li>Publish **pending** assignments in order to increase cluster 
time to the value greater then requested within
+     *     getAssignments. Pay attention that not stable but pending 
assignments were published. It's used in order to move cluster time.
+     *     </li>
+     *     <li>Ensure that assignments future was completed with null 
value.</li>
+     * </ol>
+     */
+    // Races are possible, so let's give it a better change to fail.
+    @RepeatedTest(100)
+    public void testGetAssignmentsMayReturnFutureWithNullValue() throws 
Exception {
+        // Request assignments for the timestamp < cluster time (MS safe time).
+        CompletableFuture<TokenizedAssignments> assignmentsFuture = 
assignmentsPlacementDriver.getAssignments(GROUP_1, clockService.now());
+
+        // Ensure that assignments future is not completed.
+        assertFalse(assignmentsFuture.isDone());
+
+        // Publish **pending** assignments in order to increase cluster time 
to the value greater then requested within getAssignments.
+        // Pay attention that not stable but pending assignments were 
published. It's used in order to move cluster time.
+        publishPendingAssignments(ASSIGNMENTS_ABC);
+
+        // Ensure that assignments future was completed with null value.
+        assertThat(assignmentsFuture, willCompleteSuccessfully());
+        assertNull(assignmentsFuture.get());
+    }
+
+    /**
+     * Ensure that newest assignments are retrieved by 
AssignmentsPlacementDriver#getAssignments.
+     *
+     * <ol>
+     *     <li>Publish stable assignments.</li>
+     *     <li>Request assignments for not requestTimestamp >= 
ms.safeTime.</li>
+     *     <li>Ensure that assignments future is not completed.</li>
+     *     <li>Publish new stable assignments that besides publishing the 
assignments will move ms.safeTime.</li>
+     *     <li>Ensure that assignments retrieval future will complete 
successfully with newest assignments and not initially published.</li>
+     *     <li>Retrieve assignments one more time for the same request 
timestamp.</li>
+     *     <li>Ensure that assignments retrieval future will complete 
successfully with same assignments and token.</li>
+     *     <li>Publish yet another new stable assignments.</li>
+     *     <li>Ensure that assignments retrieval future will complete 
successfully with newest assignments and not previously retrieved.
+     *     </li>
+     *     <li>Ensure that assignments token was updated.</li>
+     * </ol>
+     */
+    // Races are possible, so let's give it a better change to fail.
+    @RepeatedTest(100)
+    public void 
testGetAssignmentsReturnsNewestAssignmentsAssociatedWithSafeTimeGreaterThanRequested()
 throws Exception {
+        // Publish stable assignments.
+        publishStableAssignments(ASSIGNMENTS_A);
+
+        // requestTimestamp >= clusterTime
+        HybridTimestamp requestTimestamp = 
HybridTimestamp.hybridTimestamp(metastore.clusterTime().nowLong() + 1);
+
+        // Request assignments for not requestTimestamp >= ms.safeTime.
+        CompletableFuture<TokenizedAssignments> assignmentsFuture = 
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+        // Ensure that assignments future is not completed.
+        assertFalse(assignmentsFuture.isDone());
+
+        // Publish new stable assignments that besides publishing the 
assignments will move ms.safeTime.
+        publishStableAssignments(ASSIGNMENTS_AB);
+
+        // Ensure that assignments retrieval future will complete successfully 
with newest assignments and not initially published.
+        assertThat(assignmentsFuture, willCompleteSuccessfully());
+        assertEquals(ASSIGNMENTS_AB, assignmentsFuture.get().nodes());
+        long assignmentsTokenAb = assignmentsFuture.get().token();
+
+        // Retrieve assignments one more time for the same request timestamp.
+        CompletableFuture<TokenizedAssignments> assignmentsFuture2 = 
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+        // Ensure that assignments retrieval future will complete successfully 
with same assignments and token.
+        assertEquals(ASSIGNMENTS_AB, assignmentsFuture.get().nodes());
+        assertEquals(assignmentsTokenAb, assignmentsFuture2.get().token());
+
+        // Publish yet another new stable assignments.
+        publishStableAssignments(ASSIGNMENTS_ABC);
+
+        // Request assignments for the same requestTimestamp
+        CompletableFuture<TokenizedAssignments> assignmentsFuture3 = 
assignmentsPlacementDriver.getAssignments(GROUP_1, requestTimestamp);
+
+        // Ensure that assignments retrieval future will complete successfully 
with newest assignments and not previously retrieved.
+        assertThat(assignmentsFuture3, willCompleteSuccessfully());
+        assertEquals(ASSIGNMENTS_ABC, assignmentsFuture3.get().nodes());
+        // Ensure that assignments token was updated.
+        assertNotEquals(assignmentsTokenAb, assignmentsFuture3.get().token());
+    }
+
     private long publishLease(Lease lease) {
         return publishLeases(lease);
     }
@@ -658,6 +838,33 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
         return expRev;
     }
 
+    private void publishAssignments(ByteArray assignmentsKey, Set<Assignment> 
assignments) {
+        long timestampBeforeUpdate = metastore.clusterTime().nowLong();
+
+        metastore.invoke(
+                Conditions.notExists(FAKE_KEY),
+                put(assignmentsKey, Assignments.toBytes(assignments)),
+                noop()
+        );
+
+        assertThat(
+                
metastore.clusterTime().waitFor(HybridTimestamp.hybridTimestamp(timestampBeforeUpdate
 + 1)),
+                willCompleteSuccessfully()
+        );
+    }
+
+    private void publishStableAssignments(Set<Assignment> assignments) {
+        publishAssignments(RebalanceUtil.stablePartAssignmentsKey(GROUP_1), 
assignments);
+    }
+
+    private void publishPendingAssignments(Set<Assignment> assignments) {
+        publishAssignments(RebalanceUtil.pendingPartAssignmentsKey(GROUP_1), 
assignments);
+    }
+
+    private void publishPlannedAssignments(Set<Assignment> assignments) {
+        publishAssignments(RebalanceUtil.plannedPartAssignmentsKey(GROUP_1), 
assignments);
+    }
+
     private CompletableFuture<PrimaryReplicaEventParameters> 
listenAnyReplicaBecomePrimaryEvent() {
         return listenReplicaBecomePrimaryEvent(null);
     }
@@ -669,7 +876,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     private CompletableFuture<PrimaryReplicaEventParameters> 
listenReplicaBecomePrimaryEvent(@Nullable ReplicationGroupId groupId) {
         var eventParametersFuture = new 
CompletableFuture<PrimaryReplicaEventParameters>();
 
-        placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> {
+        leasePlacementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> {
             if (groupId == null || groupId.equals(parameters.groupId())) {
                 eventParametersFuture.complete(parameters);
             }
@@ -701,4 +908,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
             }
         }, clockService);
     }
+
+    private AssignmentsTracker createAssignmentsPlacementDriver() {
+        return new AssignmentsTracker(metastore);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index 4ad473891e..eca347e990 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.wrappers;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -62,4 +63,12 @@ abstract class DelegatingPlacementDriver implements 
PlacementDriver {
     public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
         return delegate.previousPrimaryExpired(grpId);
     }
+
+    @Override
+    public CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return delegate.getAssignments(replicationGroupId, clusterTimeToAwait);
+    }
 }

Reply via email to