This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 48b6ebb981 IGNITE-22678 Java thin: fix ClientPartitionManager cache 
invalidation (#4175)
48b6ebb981 is described below

commit 48b6ebb981959460abf23ae8771d1a00101868af
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Aug 5 16:25:33 2024 +0300

    IGNITE-22678 Java thin: fix ClientPartitionManager cache invalidation 
(#4175)
    
    * Remove hardcoded one-minute cache timeout
    * Use `ReliableChannel.partitionAssignmentTimestamp` to invalidate the 
cache (relies on `PARTITION_ASSIGNMENT_FLAG` response flag)
---
 .../ignite/client/handler/FakePlacementDriver.java |  4 ++
 .../client/table/ClientPartitionManager.java       | 27 +++++---
 .../apache/ignite/client/AbstractClientTest.java   |  6 +-
 .../ignite/client/ClientPartitionManagerTest.java  | 80 ++++++++++++++++++++++
 .../ignite/client/PartitionAwarenessTest.java      |  2 +-
 .../org/apache/ignite/client/ReconnectTest.java    |  7 +-
 .../java/org/apache/ignite/client/TestServer.java  | 18 +++--
 .../client/benchmarks/ClientPutGetBenchmark.java   |  3 +-
 .../org/apache/ignite/client/fakes/FakeIgnite.java |  9 ++-
 .../ignite/client/fakes/FakeIgniteTables.java      |  8 ++-
 .../ignite/client/fakes/FakeInternalTable.java     | 22 +++++-
 11 files changed, 150 insertions(+), 36 deletions(-)

diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 62c0fc17e8..54da3e4648 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -113,6 +113,10 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
     }
 
+    public List<ReplicaMeta> primaryReplicas() {
+        return primaryReplicas;
+    }
+
     private static ReplicaMeta getReplicaMeta(String leaseholder, long 
leaseStartTime) {
         //noinspection serial
         return new ReplicaMeta() {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
index 1e07c62174..7efa4999fe 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -21,8 +21,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.client.TcpIgniteClient.unpackClusterNode;
 import static 
org.apache.ignite.internal.client.table.ClientTupleSerializer.getPartitionAwarenessProvider;
 
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -42,14 +40,14 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Client partition manager implementation.
  */
-public class ClientPartitionManager implements PartitionManager {
+class ClientPartitionManager implements PartitionManager {
     private final ClientTable tbl;
 
     private final Lock lock = new ReentrantLock();
 
     private final Map<Partition, ClusterNode> cache = new HashMap<>();
 
-    private @Nullable Instant aliveUntil;
+    private long assignmentChangeTimestamp;
 
     ClientPartitionManager(ClientTable clientTable) {
         this.tbl = clientTable;
@@ -80,6 +78,8 @@ public class ClientPartitionManager implements 
PartitionManager {
             return completedFuture(cache);
         }
 
+        var currentTs = tbl.channel().partitionAssignmentTimestamp();
+
         return tbl.channel().serviceAsync(ClientOp.PRIMARY_REPLICAS_GET,
                 w -> w.out().packInt(tbl.tableId()),
                 r -> {
@@ -95,7 +95,7 @@ public class ClientPartitionManager implements 
PartitionManager {
 
                     return res;
                 })
-                .thenApply(this::updateCache);
+                .thenApply(map -> updateCache(map, currentTs));
     }
 
     @Override
@@ -115,12 +115,14 @@ public class ClientPartitionManager implements 
PartitionManager {
 
     private @Nullable ClusterNode getClusterNode(Partition partition) {
         lock.lock();
+
         try {
-            if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+            //noinspection resource
+            if (tbl.channel().partitionAssignmentTimestamp() > 
assignmentChangeTimestamp) {
                 cache.clear();
-                aliveUntil = null;
                 return null;
             }
+
             return cache.get(partition);
         } finally {
             lock.unlock();
@@ -129,23 +131,26 @@ public class ClientPartitionManager implements 
PartitionManager {
 
     private @Nullable Map<Partition, ClusterNode> lookupCache() {
         lock.lock();
+
         try {
-            if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+            //noinspection resource
+            if (tbl.channel().partitionAssignmentTimestamp() > 
assignmentChangeTimestamp) {
                 cache.clear();
-                aliveUntil = null;
                 return null;
             }
+
             return Map.copyOf(cache);
         } finally {
             lock.unlock();
         }
     }
 
-    private Map<Partition, ClusterNode> updateCache(Map<Partition, 
ClusterNode> map) {
+    private Map<Partition, ClusterNode> updateCache(Map<Partition, 
ClusterNode> map, long timestamp) {
         lock.lock();
+
         try {
             cache.putAll(map);
-            aliveUntil = Instant.now().plus(1, ChronoUnit.MINUTES);
+            assignmentChangeTimestamp = timestamp;
             return map;
         } finally {
             lock.unlock();
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 3c938a6b87..3de754b39e 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -45,7 +45,7 @@ public abstract class AbstractClientTest extends 
BaseIgniteAbstractTest {
 
     protected static TestServer testServer;
 
-    protected static Ignite server;
+    protected static FakeIgnite server;
 
     protected static IgniteClient client;
 
@@ -116,7 +116,7 @@ public abstract class AbstractClientTest extends 
BaseIgniteAbstractTest {
      */
     public static TestServer startServer(
             long idleTimeout,
-            Ignite ignite
+            FakeIgnite ignite
     ) {
         return startServer(idleTimeout, ignite, null);
     }
@@ -131,7 +131,7 @@ public abstract class AbstractClientTest extends 
BaseIgniteAbstractTest {
      */
     public static TestServer startServer(
             long idleTimeout,
-            Ignite ignite,
+            FakeIgnite ignite,
             String nodeName
     ) {
         return new TestServer(idleTimeout, ignite, null, null, nodeName, 
clusterId, null, null);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientPartitionManagerTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/ClientPartitionManagerTest.java
new file mode 100644
index 0000000000..8cae25386e
--- /dev/null
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientPartitionManagerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.client.handler.FakePlacementDriver;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for client partition manager.
+ */
+public class ClientPartitionManagerTest extends AbstractClientTest {
+    private static final String TABLE_NAME = "tbl1";
+
+    private static int tableId;
+
+    @BeforeEach
+    public void setUp() {
+        Table table = ((FakeIgniteTables) 
server.tables()).createTable(TABLE_NAME);
+
+        tableId = ((TableViewInternal) table).tableId();
+    }
+
+    @Test
+    public void testPrimaryReplicasCacheInvalidation() {
+        Table table = client.tables().table(TABLE_NAME);
+        PartitionManager partMgr = table.partitionManager();
+        HashPartition part0 = new HashPartition(0);
+        HashPartition part2 = new HashPartition(2);
+
+        // Before update.
+        Map<Partition, ClusterNode> map = 
partMgr.primaryReplicasAsync().join();
+        assertEquals(4, map.size());
+        assertEquals("s", map.get(part0).name());
+        assertEquals("s", partMgr.primaryReplicaAsync(part2).join().name());
+
+        // Update.
+        updateServerReplicas(List.of("foo", "bar", "baz", "qux"));
+        client.tables().tables(); // Perform a request to trigger cache 
invalidation.
+
+        // After update.
+        Map<Partition, ClusterNode> map2 = 
partMgr.primaryReplicasAsync().join();
+        assertEquals(4, map2.size());
+        assertEquals("foo", map2.get(part0).name());
+        assertEquals("baz", partMgr.primaryReplicaAsync(part2).join().name());
+    }
+
+    private static void updateServerReplicas(List<String> replicas) {
+        FakePlacementDriver placementDriver = testServer.placementDriver();
+        long leaseStartTime = new HybridClockImpl().nowLong();
+        placementDriver.setReplicas(replicas, tableId, leaseStartTime);
+    }
+}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 5a1120fe32..84a9f4daf7 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -81,7 +81,7 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
 
     private static TestServer testServer2;
 
-    private static Ignite server2;
+    private static FakeIgnite server2;
 
     private static IgniteClient client2;
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index a11bd877c2..6ff0c901a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -23,7 +23,6 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
@@ -109,7 +108,7 @@ public class ReconnectTest extends BaseIgniteAbstractTest {
             server2.close();
             waitForConnections(client, 1);
 
-            Ignite ignite = new FakeIgnite();
+            FakeIgnite ignite = new FakeIgnite();
             server2 = new TestServer(0, ignite, null, null, "node3", 
AbstractClientTest.clusterId, null, 10903);
 
             if (reconnectEnabled) {
@@ -145,9 +144,9 @@ public class ReconnectTest extends BaseIgniteAbstractTest {
     }
 
     private void startTwoServers() {
-        Ignite ignite = new FakeIgnite();
+        FakeIgnite ignite = new FakeIgnite();
         server = new TestServer(0, ignite, null, null, "node1", 
AbstractClientTest.clusterId, null, 10901);
-        Ignite ignite1 = new FakeIgnite();
+        FakeIgnite ignite1 = new FakeIgnite();
         server2 = new TestServer(0, ignite1, null, null, "node2", 
AbstractClientTest.clusterId, null, 10902);
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index f545f1e71c..015621ebd7 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -92,9 +92,7 @@ public class TestServer implements AutoCloseable {
 
     private final AuthenticationManager authenticationManager;
 
-    private final Ignite ignite;
-
-    private final FakePlacementDriver placementDriver = new 
FakePlacementDriver(FakeInternalTable.PARTITIONS);
+    private final FakeIgnite ignite;
 
     /**
      * Constructor.
@@ -104,7 +102,7 @@ public class TestServer implements AutoCloseable {
      */
     public TestServer(
             long idleTimeout,
-            Ignite ignite
+            FakeIgnite ignite
     ) {
         this(
                 idleTimeout,
@@ -123,7 +121,7 @@ public class TestServer implements AutoCloseable {
      */
     public TestServer(
             long idleTimeout,
-            Ignite ignite,
+            FakeIgnite ignite,
             @Nullable Function<Integer, Boolean> shouldDropConnection,
             @Nullable Function<Integer, Integer> responseDelay,
             @Nullable String nodeName,
@@ -153,7 +151,7 @@ public class TestServer implements AutoCloseable {
      */
     public TestServer(
             long idleTimeout,
-            Ignite ignite,
+            FakeIgnite ignite,
             @Nullable Function<Integer, Boolean> shouldDropConnection,
             @Nullable Function<Integer, Integer> responseDelay,
             @Nullable String nodeName,
@@ -234,10 +232,10 @@ public class TestServer implements AutoCloseable {
                 metrics,
                 authenticationManager,
                 clock,
-                placementDriver,
+                ignite.placementDriver(),
                 clientConnectorConfiguration)
                 : new ClientHandlerModule(
-                        ((FakeIgnite) ignite).queryEngine(),
+                        ignite.queryEngine(),
                         (IgniteTablesInternal) ignite.tables(),
                         (IgniteTransactionsImpl) ignite.transactions(),
                         (IgniteComputeInternal) ignite.compute(),
@@ -250,7 +248,7 @@ public class TestServer implements AutoCloseable {
                         new TestClockService(clock),
                         new AlwaysSyncedSchemaSyncService(),
                         new FakeCatalogService(FakeInternalTable.PARTITIONS),
-                        placementDriver,
+                        ignite.placementDriver(),
                         clientConnectorConfiguration,
                         new TestLowWatermark()
                 );
@@ -317,7 +315,7 @@ public class TestServer implements AutoCloseable {
      * @return Placement driver.
      */
     public FakePlacementDriver placementDriver() {
-        return placementDriver;
+        return ignite.placementDriver();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
 
b/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
index 64656cf17b..677d5b172a 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/benchmarks/ClientPutGetBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.benchmarks;
 
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.ResourceLeakDetector.Level;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.client.TestServer;
 import org.apache.ignite.client.fakes.FakeIgnite;
@@ -67,7 +66,7 @@ public class ClientPutGetBenchmark {
 
     private TestServer testServer;
 
-    private Ignite ignite;
+    private FakeIgnite ignite;
 
     private IgniteClient client;
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 1d6678d73c..37d1ea2924 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.catalog.IgniteCatalog;
+import org.apache.ignite.client.handler.FakePlacementDriver;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -50,6 +51,8 @@ public class FakeIgnite implements Ignite {
 
     private final IgniteTables tables;
 
+    private final FakePlacementDriver placementDriver = new 
FakePlacementDriver(FakeInternalTable.PARTITIONS);
+
     /**
      * Default constructor.
      */
@@ -65,7 +68,7 @@ public class FakeIgnite implements Ignite {
     public FakeIgnite(String name) {
         this.name = name;
         this.compute = new FakeCompute(name, this);
-        this.tables = new FakeIgniteTables(compute);
+        this.tables = new FakeIgniteTables(compute, placementDriver);
     }
 
     /** {@inheritDoc} */
@@ -122,4 +125,8 @@ public class FakeIgnite implements Ignite {
     public HybridTimestampTracker timestampTracker() {
         return hybridTimestampTracker;
     }
+
+    public FakePlacementDriver placementDriver() {
+        return placementDriver;
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index 962bdf60ea..20f4d38c2a 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import org.apache.ignite.client.handler.FakePlacementDriver;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -73,8 +74,11 @@ public class FakeIgniteTables implements 
IgniteTablesInternal {
 
     private final IgniteCompute compute;
 
-    FakeIgniteTables(IgniteCompute compute) {
+    private final FakePlacementDriver placementDriver;
+
+    FakeIgniteTables(IgniteCompute compute, FakePlacementDriver 
placementDriver) {
         this.compute = compute;
+        this.placementDriver = placementDriver;
     }
 
     /**
@@ -220,7 +224,7 @@ public class FakeIgniteTables implements 
IgniteTablesInternal {
         };
 
         return new TableImpl(
-                new FakeInternalTable(name, id, keyExtractor, compute),
+                new FakeInternalTable(name, id, keyExtractor, compute, 
placementDriver),
                 schemaReg,
                 new HeapLockManager(),
                 new SchemaVersions() {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 2789d783dc..eb3bc3c6ee 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BiConsumer;
 import javax.naming.OperationNotSupportedException;
+import org.apache.ignite.client.handler.FakePlacementDriver;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobTarget;
@@ -43,6 +44,7 @@ import 
org.apache.ignite.internal.compute.streamer.StreamerReceiverJob;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -80,6 +82,8 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
 
     private final IgniteCompute compute;
 
+    private final FakePlacementDriver placementDriver;
+
     /** Data access listener. */
     private BiConsumer<String, Object> dataAccessListener;
 
@@ -89,12 +93,19 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
      * @param tableName Name.
      * @param tableId Id.
      * @param keyExtractor Function which converts given binary row to an 
index key.
+     * @param placementDriver Placement driver.
      */
-    FakeInternalTable(String tableName, int tableId, ColumnsExtractor 
keyExtractor, IgniteCompute compute) {
+    FakeInternalTable(
+            String tableName,
+            int tableId,
+            ColumnsExtractor keyExtractor,
+            IgniteCompute compute,
+            FakePlacementDriver placementDriver) {
         this.tableName = tableName;
         this.tableId = tableId;
         this.keyExtractor = keyExtractor;
         this.compute = compute;
+        this.placementDriver = placementDriver;
     }
 
     @Override
@@ -506,8 +517,15 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
 
     @Override
     public CompletableFuture<ClusterNode> partitionLocation(TablePartitionId 
partitionId) {
+        List<ReplicaMeta> replicaMetas = placementDriver.primaryReplicas();
+        ReplicaMeta replica = replicaMetas.get(partitionId.partitionId());
+
+        //noinspection DataFlowIssue
         return completedFuture(
-                new ClusterNodeImpl("server-1", "server-1", new 
NetworkAddress("localhost", 10800)));
+                new ClusterNodeImpl(
+                        replica.getLeaseholderId(),
+                        replica.getLeaseholder(),
+                        new NetworkAddress("localhost", 10800)));
     }
 
     @Override

Reply via email to