This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c0bbb420d9 IGNITE-18742 Implement initial lease granting on placement 
driver side (#1871)
c0bbb420d9 is described below

commit c0bbb420d9b4083853b97947714e8da4244c2d00
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Apr 13 23:06:58 2023 +0400

    IGNITE-18742 Implement initial lease granting on placement driver side 
(#1871)
---
 .../ignite/internal/hlc/HybridTimestamp.java       |   3 +
 modules/placement-driver/build.gradle              |   3 +-
 .../internal/placementdriver/ActiveActorTest.java  |   5 +
 .../PlacementDriverManagerTest.java                | 173 +++++++++++++++++++--
 .../ignite/internal/placementdriver/Lease.java     |  81 ----------
 .../internal/placementdriver/LeaseUpdater.java     | 127 ++++++++++++---
 .../placementdriver/PlacementDriverManager.java    |  38 +++--
 .../internal/placementdriver/leases/Lease.java     | 146 +++++++++++++++++
 .../placementdriver/{ => leases}/LeaseTracker.java |   5 +-
 .../negotiation/LeaseAgreement.java                | 100 ++++++++++++
 .../negotiation/LeaseNegotiator.java               | 127 +++++++++++++++
 11 files changed, 678 insertions(+), 130 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 6ac7f6e4d5..87e47d49d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -33,6 +33,9 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
     /** A constant holding the maximum value a {@code HybridTimestamp} can 
have. */
     public static final HybridTimestamp MAX_VALUE = new 
HybridTimestamp(Long.MAX_VALUE, Integer.MAX_VALUE);
 
+    /** The constant holds the minimum value which {@code HybridTimestamp} 
might formally have. */
+    public static final HybridTimestamp MIN_VALUE = new HybridTimestamp(1L, 
-1);
+
     /**
      * Cluster cLock skew. The constant determines the undefined inclusive 
interval to compares timestamp from various nodes.
      * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
diff --git a/modules/placement-driver/build.gradle 
b/modules/placement-driver/build.gradle
index b12cf94ab0..f494b7ff26 100644
--- a/modules/placement-driver/build.gradle
+++ b/modules/placement-driver/build.gradle
@@ -22,8 +22,9 @@ apply from: 
"$rootDir/buildscripts/java-integration-test.gradle"
 apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
 
 dependencies {
-    implementation project(':ignite-metastorage-api')
+    api project(':ignite-placement-driver-api')
 
+    implementation project(':ignite-metastorage-api')
     implementation project(':ignite-core')
     implementation project(':ignite-configuration-api')
     implementation project(':ignite-raft-api')
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 0f7eeb7a41..8a4292b97a 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -27,6 +27,8 @@ import static 
org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
 
 import java.nio.file.Path;
 import java.util.HashMap;
@@ -53,6 +55,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -438,6 +441,8 @@ public class ActiveActorTest extends IgniteAbstractTest {
             int nodes,
             int clientPort
     ) {
+        when(msm.invoke(any(), any(Operation.class), 
any(Operation.class))).thenReturn(completedFuture(true));
+
         List<NetworkAddress> addresses = getNetworkAddresses(nodes);
 
         var nodeFinder = new StaticNodeFinder(addresses);
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index eda296e2ab..355badbcf5 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -26,6 +26,8 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static org.apache.ignite.lang.ByteArray.fromString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -38,7 +40,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -57,6 +61,10 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -66,6 +74,7 @@ import 
org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -78,6 +87,7 @@ import 
org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -88,13 +98,23 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(ConfigurationExtension.class)
 public class PlacementDriverManagerTest extends IgniteAbstractTest {
     public static final int PORT = 1234;
+
+    private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
     private String nodeName;
 
-    HybridClock clock = new HybridClockImpl();
+    /** Another node name. The node name is matched to {@code 
anotherClusterService}. */
+    private String anotherNodeName;
+
+    private HybridClock clock = new HybridClockImpl();
+
     private VaultManager vaultManager;
 
     private ClusterService clusterService;
 
+    /** This service is used to redirect a lease proposal. */
+    private ClusterService anotherClusterService;
+
     private Loza raftManager;
 
     @InjectConfiguration
@@ -112,11 +132,18 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
 
     private TestInfo testInfo;
 
+    /** This closure handles {@link LeaseGrantedMessage} to check the 
placement driver manager behavior. */
+    private BiConsumer<LeaseGrantedMessage, String> leaseGrantHandler;
+
     @BeforeEach
     public void beforeTest(TestInfo testInfo) throws NodeStoppingException {
         this.nodeName = testNodeName(testInfo, PORT);
+        this.anotherNodeName = testNodeName(testInfo, PORT + 1);
         this.testInfo = testInfo;
 
+        assertTrue(nodeName.hashCode() < anotherNodeName.hashCode(),
+                "Node for the first lease grant message should be determined 
strictly.");
+
         startPlacementDriverManager();
     }
 
@@ -126,6 +153,29 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
         var nodeFinder = new StaticNodeFinder(Collections.singletonList(new 
NetworkAddress("localhost", PORT)));
 
         clusterService = ClusterServiceTestUtils.clusterService(testInfo, 
PORT, nodeFinder);
+        anotherClusterService = 
ClusterServiceTestUtils.clusterService(testInfo, PORT + 1, nodeFinder);
+
+        
anotherClusterService.messagingService().addMessageHandler(PlacementDriverMessageGroup.class,
 (msg, sender, correlationId) -> {
+            assert msg instanceof LeaseGrantedMessage : "Message type is 
unexpected [type=" + msg.getClass().getSimpleName() + ']';
+
+            log.info("Lease is being granted [actor={}, recipient={}, 
force={}]", sender, anotherNodeName,
+                    ((LeaseGrantedMessage) msg).force());
+
+            if (leaseGrantHandler != null) {
+                leaseGrantHandler.accept((LeaseGrantedMessage) msg, sender);
+            }
+        });
+
+        
clusterService.messagingService().addMessageHandler(PlacementDriverMessageGroup.class,
 (msg, sender, correlationId) -> {
+            assert msg instanceof LeaseGrantedMessage : "Message type is 
unexpected [type=" + msg.getClass().getSimpleName() + ']';
+
+            log.info("Lease is being granted [actor={}, recipient={}, 
force={}]", sender, nodeName,
+                    ((LeaseGrantedMessage) msg).force());
+
+            if (leaseGrantHandler != null) {
+                leaseGrantHandler.accept((LeaseGrantedMessage) msg, sender);
+            }
+        });
 
         ClusterManagementGroupManager cmgManager = 
mock(ClusterManagementGroupManager.class);
 
@@ -183,6 +233,7 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
 
         vaultManager.start();
         clusterService.start();
+        anotherClusterService.start();
         raftManager.start();
         metaStorageManager.start();
         placementDriverManager.start();
@@ -205,6 +256,7 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
         placementDriverManager.stop();
         metaStorageManager.stop();
         raftManager.stop();
+        anotherClusterService.stop();
         clusterService.stop();
         vaultManager.stop();
     }
@@ -237,14 +289,15 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
     public void testLeaseCreate() throws Exception {
         TablePartitionId grpPart0 = createTableAssignment();
 
-        checkLeaseCreated(grpPart0);
+        checkLeaseCreated(grpPart0, false);
     }
 
     @Test
+    @WithSystemProperty(key = "IGNITE_LONG_LEASE", value = "200")
     public void testLeaseRenew() throws Exception {
         TablePartitionId grpPart0 = createTableAssignment();
 
-        checkLeaseCreated(grpPart0);
+        checkLeaseCreated(grpPart0, false);
 
         var leaseFut = 
metaStorageManager.get(fromString(PLACEMENTDRIVER_PREFIX + grpPart0));
 
@@ -257,16 +310,17 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
 
             Lease leaseRenew = ByteUtils.fromBytes(fut.join().value());
 
-            return 
lease.getLeaseExpirationTime().compareTo(leaseRenew.getLeaseExpirationTime()) < 
0;
+            return 
lease.getExpirationTime().compareTo(leaseRenew.getExpirationTime()) < 0;
 
         }, 10_000));
     }
 
     @Test
+    @WithSystemProperty(key = "IGNITE_LONG_LEASE", value = "200")
     public void testLeaseholderUpdate() throws Exception {
         TablePartitionId grpPart0 = createTableAssignment();
 
-        checkLeaseCreated(grpPart0);
+        checkLeaseCreated(grpPart0, false);
 
         Set<Assignment> assignments = Set.of();
 
@@ -277,7 +331,7 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
 
             Lease lease = ByteUtils.fromBytes(fut.join().value());
 
-            return lease.getLeaseExpirationTime().compareTo(clock.now()) < 0;
+            return lease.getExpirationTime().compareTo(clock.now()) < 0;
 
         }, 10_000));
 
@@ -290,37 +344,129 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
 
             Lease lease = ByteUtils.fromBytes(fut.join().value());
 
-            return lease.getLeaseExpirationTime().compareTo(clock.now()) > 0;
+            return lease.getExpirationTime().compareTo(clock.now()) > 0;
 
         }, 10_000));
     }
 
+    @Test
+    @Disabled("IGNITE-18958 Implement handling of lease grant responses on 
placement driver side")
+    public void testLeaseAccepted() throws Exception {
+        TablePartitionId grpPart0 = createTableAssignment();
+
+        checkLeaseCreated(grpPart0, true);
+    }
+
+    @Test
+    @Disabled("IGNITE-18958 Implement handling of lease grant responses on 
placement driver side")
+    public void testLeaseForceAccepted() throws Exception {
+        leaseGrantHandler = (req, sender) ->
+                PLACEMENT_DRIVER_MESSAGES_FACTORY
+                        .leaseGrantedMessageResponse()
+                        .accepted(req.force())
+                        .build();
+
+        TablePartitionId grpPart0 = createTableAssignment();
+
+        checkLeaseCreated(grpPart0, true);
+    }
+
+    @Test
+    public void testExceptionOnAcceptance() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        leaseGrantHandler = (req, sender) -> {
+            latch.countDown();
+
+            throw new RuntimeException("test");
+        };
+
+        TablePartitionId grpPart0 = createTableAssignment();
+
+        checkLeaseCreated(grpPart0, false);
+
+        latch.await();
+
+        Lease lease = checkLeaseCreated(grpPart0, false);
+
+        assertFalse(lease.isAccepted());
+    }
+
+    @Test
+    @Disabled("IGNITE-18958 Implement handling of lease grant responses on 
placement driver side")
+    public void testRedirectionAcceptance() throws Exception {
+        leaseGrantHandler = (req, sender) ->
+                PLACEMENT_DRIVER_MESSAGES_FACTORY
+                        .leaseGrantedMessageResponse()
+                        .accepted(false)
+                        .redirectProposal(anotherNodeName)
+                        .build();
+
+        TablePartitionId grpPart0 = createTableAssignment();
+
+        checkLeaseCreated(grpPart0, true);
+    }
+
     @Test
     public void testLeaseRestore() throws Exception {
         TablePartitionId grpPart0 = createTableAssignment();
 
-        checkLeaseCreated(grpPart0);
+        checkLeaseCreated(grpPart0, false);
 
         stopPlacementDriverManager();
         startPlacementDriverManager();
 
-        checkLeaseCreated(grpPart0);
+        checkLeaseCreated(grpPart0, false);
+    }
+
+    @Test
+    public void testLeaseMatchGrantMessage() throws Exception {
+        var leaseGrantReqRef = new AtomicReference<LeaseGrantedMessage>();
+
+        leaseGrantHandler = (req, sender) -> {
+            leaseGrantReqRef.set(req);
+        };
+
+        TablePartitionId grpPart0 = createTableAssignment();
+
+        Lease lease = checkLeaseCreated(grpPart0, false);
+
+        assertTrue(waitForCondition(() -> leaseGrantReqRef.get() != null, 
10_000));
+
+        assertEquals(leaseGrantReqRef.get().leaseStartTime(), 
lease.getStartTime());
+        assertEquals(leaseGrantReqRef.get().leaseExpirationTime(), 
lease.getExpirationTime());
     }
 
     /**
      * Checks if a group lease is created during the timeout.
      *
      * @param grpPartId Replication group id.
+     * @param waitAccept Await a lease with the accepted flag.
+     * @return A lease that is read from Meta storage.
      * @throws InterruptedException If the waiting is interrupted.
      */
-    private void checkLeaseCreated(TablePartitionId grpPartId) throws 
InterruptedException {
+    private Lease checkLeaseCreated(TablePartitionId grpPartId, boolean 
waitAccept) throws InterruptedException {
+        AtomicReference<Lease> leaseRef = new AtomicReference<>();
+
         assertTrue(waitForCondition(() -> {
             var leaseFut = 
metaStorageManager.get(fromString(PLACEMENTDRIVER_PREFIX + grpPartId));
 
             var leaseEntry = leaseFut.join();
 
-            return leaseEntry != null && !leaseEntry.empty();
+            if (leaseEntry != null && !leaseEntry.empty()) {
+                Lease lease = ByteUtils.fromBytes(leaseEntry.value());
+
+                if (!waitAccept) {
+                    leaseRef.set(lease);
+                } else if (lease.isAccepted()) {
+                    leaseRef.set(lease);
+                }
+            }
+
+            return leaseRef.get() != null;
         }, 10_000));
+
+        return leaseRef.get();
     }
 
     /**
@@ -332,7 +478,7 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
     private TablePartitionId createTableAssignment() throws Exception {
         AtomicReference<UUID> tblIdRef = new AtomicReference<>();
 
-        List<Set<Assignment>> assignments = 
AffinityUtils.calculateAssignments(Collections.singleton(nodeName), 1, 1);
+        List<Set<Assignment>> assignments = 
AffinityUtils.calculateAssignments(List.of(nodeName, anotherNodeName), 1, 2);
 
         int zoneId = createZone();
 
@@ -350,6 +496,7 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
         var grpPart0 = new TablePartitionId(tblIdRef.get(), 0);
 
         log.info("Fake table created [id={}, repGrp={}]", tblIdRef.get(), 
grpPart0);
+
         return grpPart0;
     }
 
@@ -362,7 +509,7 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
         dstZnsCfg.distributionZones().change(zones -> {
             zones.create("zone1", ch -> {
                 ch.changePartitions(1);
-                ch.changeReplicas(1);
+                ch.changeReplicas(2);
                 ch.changeZoneId(DEFAULT_ZONE_ID + 1);
             });
         }).join();
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java
deleted file mode 100644
index e53428983b..0000000000
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.placementdriver;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.network.ClusterNode;
-
-/**
- * A lease representation in memory.
- * The real lease is stored in Meta storage.
- */
-public class Lease implements Serializable {
-    /** The object is used when nothing holds the lease. */
-    public static Lease EMPTY_LEASE = new Lease(null, null);
-
-    /** A node that holds a lease until {@code stopLeas}. */
-    private final ClusterNode leaseholder;
-
-    /** Timestamp to expiration the lease. */
-    private final HybridTimestamp leaseExpirationTime;
-
-    /**
-     * Default constructor.
-     */
-    public Lease() {
-        this(null, null);
-    }
-
-    /**
-     * The constructor.
-     *
-     * @param leaseholder Lease holder.
-     * @param leaseExpirationTime Lease expiration timestamp.
-     */
-    public Lease(ClusterNode leaseholder, HybridTimestamp leaseExpirationTime) 
{
-        this.leaseholder = leaseholder;
-        this.leaseExpirationTime = leaseExpirationTime;
-    }
-
-    /**
-     * Get a leaseholder node.
-     *
-     * @return Leaseholder or {@code null} if nothing holds the lease.
-     */
-    public ClusterNode getLeaseholder() {
-        return leaseholder;
-    }
-
-    /**
-     * Gets a lease expiration timestamp.
-     *
-     * @return Lease expiration timestamp or {@code null} if nothing holds the 
lease.
-     */
-    public HybridTimestamp getLeaseExpirationTime() {
-        return leaseExpirationTime;
-    }
-
-    @Override
-    public String toString() {
-        return "Lease {"
-                + "leaseholder=" + leaseholder
-                + ", leaseExpirationTime=" + leaseExpirationTime
-                + '}';
-    }
-}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 223a8f87ef..1b4187d264 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
-import static org.apache.ignite.internal.placementdriver.Lease.EMPTY_LEASE;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
 
 import java.util.Map;
@@ -35,13 +34,18 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
+import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteSystemProperties;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
 
 /**
  * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
@@ -54,7 +58,13 @@ public class LeaseUpdater {
     private static final long UPDATE_LEASE_MS = 200L;
 
     /** Lease holding interval. */
-    private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+    public static final long LEASE_INTERVAL = 10 * UPDATE_LEASE_MS;
+
+    /** Long lease interval. The interval is used between lease granting 
attempts. */
+    private final long longLeaseInterval;
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
 
     /** Meta storage manager. */
     private final MetaStorageManager msManager;
@@ -77,20 +87,25 @@ public class LeaseUpdater {
     /** Dedicated thread to update leases. */
     private volatile Thread updaterTread;
 
+    /** Processor to communicate with the leaseholder to negotiate the lease. 
*/
+    private LeaseNegotiator leaseNegotiator;
+
     /** Node name. */
     private String nodeName;
 
     /**
      * The constructor.
      *
+     * @param clusterService Cluster service.
      * @param vaultManager Vault manager.
-     * @param msManager Metastorage manager.
+     * @param msManager Meta storage manager.
      * @param topologyService Topology service.
      * @param tablesConfiguration Tables configuration.
      * @param leaseTracker Lease tracker.
      * @param clock Cluster clock.
      */
     public LeaseUpdater(
+            ClusterService clusterService,
             VaultManager vaultManager,
             MetaStorageManager msManager,
             LogicalTopologyService topologyService,
@@ -99,10 +114,12 @@ public class LeaseUpdater {
             LeaseTracker leaseTracker,
             HybridClock clock
     ) {
+        this.clusterService = clusterService;
         this.msManager = msManager;
         this.leaseTracker = leaseTracker;
         this.clock = clock;
 
+        this.longLeaseInterval = 
IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 120_000L);
         this.assignmentsTracker = new AssignmentsTracker(vaultManager, 
msManager, tablesConfiguration, distributionZonesConfiguration);
         this.topologyTracker = new TopologyTracker(topologyService);
         this.updater = new Updater();
@@ -130,6 +147,8 @@ public class LeaseUpdater {
      * Activates a lease updater to renew leases.
      */
     public void activate() {
+        leaseNegotiator = new LeaseNegotiator(clusterService);
+
         //TODO: IGNITE-18879 Implement lease maintenance.
         updaterTread = new Thread(updater, 
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
 
@@ -146,6 +165,8 @@ public class LeaseUpdater {
 
             updaterTread = null;
         }
+
+        leaseNegotiator = null;
     }
 
     /**
@@ -156,8 +177,16 @@ public class LeaseUpdater {
      */
     private ClusterNode nextLeaseHolder(Set<Assignment> assignments) {
         //TODO: IGNITE-18879 Implement more intellectual algorithm to choose a 
node.
+        String consistentId = null;
+
         for (Assignment assignment : assignments) {
-            ClusterNode candidate = 
topologyTracker.nodeByConsistentId(assignment.consistentId());
+            if (consistentId == null || consistentId.hashCode() > 
assignment.consistentId().hashCode()) {
+                consistentId = assignment.consistentId();
+            }
+        }
+
+        if (consistentId != null) {
+            ClusterNode candidate = 
topologyTracker.nodeByConsistentId(consistentId);
 
             if (candidate != null) {
                 return candidate;
@@ -174,29 +203,30 @@ public class LeaseUpdater {
         @Override
         public void run() {
             while (updaterTread != null && !updaterTread.isInterrupted()) {
+                long outdatedLeaseThreshold = clock.now().getPhysical() + 
LEASE_INTERVAL / 2;
+
                 for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
assignmentsTracker.assignments().entrySet()) {
                     ReplicationGroupId grpId = entry.getKey();
 
                     Lease lease = leaseTracker.getLease(grpId);
 
-                    HybridTimestamp now = clock.now();
-
-                    // Nothing holds the lease.
-                    if (lease == EMPTY_LEASE
-                            // The lease is near to expiration.
-                            || now.getPhysical() > 
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+                    // The lease is expired or close to this.
+                    if (lease.getExpirationTime().getPhysical() < 
outdatedLeaseThreshold) {
                         ClusterNode candidate = 
nextLeaseHolder(entry.getValue());
 
                         if (candidate == null) {
                             continue;
                         }
 
-                        if (isReplicationGroupUpdateLeaseholder(lease, 
candidate)) {
-                            updateLeaseInMetaStorage(grpId, lease, candidate);
+                        // We can't prolong the expired lease because we 
already have an interval of time when the lease was not active,
+                        // so we must start ne negotiation round from the 
beginning; the same we do for the groups that don't have
+                        // leaseholders at all.
+                        if (isLeaseOutdated(lease)) {
                             // New lease is granting.
-                        } else if (candidate.equals(lease.getLeaseholder())) {
-                            updateLeaseInMetaStorage(grpId, lease, candidate);
+                            writeNewLeasInMetaStorage(grpId, lease, candidate);
+                        } else if (lease.isAccepted() && 
candidate.equals(lease.getLeaseholder())) {
                             // Old lease is renewing.
+                            prolongLeaseInMetaStorage(grpId, lease);
                         }
                     }
                 }
@@ -210,39 +240,90 @@ public class LeaseUpdater {
         }
 
         /**
-         * Writes a lease in Meta storage.
+         * Writes a new lease in Meta storage.
          *
          * @param grpId Replication group id.
          * @param lease Old lease to apply CAS in Meta storage.
          * @param candidate Lease candidate.
          */
-        private void updateLeaseInMetaStorage(ReplicationGroupId grpId, Lease 
lease, ClusterNode candidate) {
+        private void writeNewLeasInMetaStorage(ReplicationGroupId grpId, Lease 
lease, ClusterNode candidate) {
             var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX + 
grpId);
-            var newTs = new HybridTimestamp(clock.now().getPhysical() + 
LEASE_PERIOD, 0);
+
+            HybridTimestamp startTs = clock.now();
+
+            var expirationTs = new HybridTimestamp(startTs.getPhysical() + 
longLeaseInterval, 0);
 
             byte[] leaseRaw = ByteUtils.toBytes(lease);
 
-            Lease renewedLease = new Lease(candidate, newTs);
+            Lease renewedLease = new Lease(candidate, startTs, expirationTs);
 
             msManager.invoke(
                     or(notExists(leaseKey), value(leaseKey).eq(leaseRaw)),
                     put(leaseKey, ByteUtils.toBytes(renewedLease)),
                     noop()
+            ).thenAccept(isCreated -> {
+                if (isCreated) {
+                    boolean force = candidate.equals(lease.getLeaseholder());
+
+                    leaseNegotiator.negotiate(grpId, renewedLease, force);
+                }
+            });
+        }
+
+        /**
+         * Writes a prolong lease in Meta storage.
+         *
+         * @param grpId Replication group id.
+         * @param lease Lease to prolong.
+         */
+        private void prolongLeaseInMetaStorage(ReplicationGroupId grpId, Lease 
lease) {
+            var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX + 
grpId);
+            var newTs = new HybridTimestamp(clock.now().getPhysical() + 
LEASE_INTERVAL, 0);
+
+            byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+            Lease renewedLease = lease.prolongLease(newTs);
+
+            msManager.invoke(
+                    value(leaseKey).eq(leaseRaw),
+                    put(leaseKey, ByteUtils.toBytes(renewedLease)),
+                    noop()
+            );
+        }
+
+        /**
+         * Writes an accepted lease in Meta storage. After the lease will be 
written to Meta storage,
+         * the lease becomes available to all components.
+         *
+         * @param grpId Replication group id.
+         * @param lease Lease to accept.
+         */
+        private void publishLease(ReplicationGroupId grpId, Lease lease) {
+            var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX + 
grpId);
+            var newTs = new HybridTimestamp(clock.now().getPhysical() + 
LEASE_INTERVAL, 0);
+
+            byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+            Lease renewedLease = lease.acceptLease(newTs);
+
+            msManager.invoke(
+                    value(leaseKey).eq(leaseRaw),
+                    put(leaseKey, ByteUtils.toBytes(renewedLease)),
+                    noop()
             );
         }
 
         /**
-         * Checks that a leaseholder candidate can take a lease on the 
replication group.
+         * Checks that the lease is outdated.
+         * {@link Lease#EMPTY_LEASE} is always outdated.
          *
          * @param lease Lease.
-         * @param candidate The node is a leaseholder candidate.
          * @return True when the candidate can be a leaseholder, otherwise 
false.
          */
-        private boolean isReplicationGroupUpdateLeaseholder(Lease lease, 
ClusterNode candidate) {
+        private boolean isLeaseOutdated(Lease lease) {
             HybridTimestamp now = clock.now();
 
-            return lease == EMPTY_LEASE
-                    || (!candidate.equals(lease.getLeaseholder()) && 
now.after(lease.getLeaseExpirationTime()));
+            return now.after(lease.getExpirationTime());
         }
     }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index e0894a7abb..0680e5bc73 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZo
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
@@ -38,6 +40,7 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -49,7 +52,7 @@ import org.jetbrains.annotations.TestOnly;
  * The another role of the manager is providing a node, which is leaseholder 
at the moment, for a particular replication group.
  */
 public class PlacementDriverManager implements IgniteComponent {
-    static final String PLACEMENTDRIVER_PREFIX = "placementdriver.lease.";
+    public static final String PLACEMENTDRIVER_PREFIX = 
"placementdriver.lease.";
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -118,6 +121,7 @@ public class PlacementDriverManager implements 
IgniteComponent {
         this.raftClientFuture = new CompletableFuture<>();
         this.leaseTracker = new LeaseTracker(vaultManager, metaStorageMgr);
         this.leaseUpdater = new LeaseUpdater(
+                clusterService,
                 vaultManager,
                 metaStorageMgr,
                 logicalTopologyService,
@@ -165,13 +169,20 @@ public class PlacementDriverManager implements 
IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public void beforeNodeStop() {
-        withRaftClientIfPresent(c -> {
-            c.unsubscribeLeader().join();
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+        try {
+            withRaftClientIfPresent(c -> {
+                c.unsubscribeLeader().join();
 
-            leaseUpdater.deInit();
-        });
+                leaseUpdater.deInit();
+            });
 
-        leaseTracker.stopTrack();
+            leaseTracker.stopTrack();
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
@@ -195,10 +206,17 @@ public class PlacementDriverManager implements 
IgniteComponent {
     }
 
     private void onLeaderChange(ClusterNode leader, Long term) {
-        if (leader.equals(clusterService.topologyService().localMember())) {
-            takeOverActiveActor();
-        } else {
-            stepDownActiveActor();
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+        try {
+            if (leader.equals(clusterService.topologyService().localMember())) 
{
+                takeOverActiveActor();
+            } else {
+                stepDownActiveActor();
+            }
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
new file mode 100644
index 0000000000..50d2988655
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+    /** The object is used when nothing holds the lease. Empty lease is always 
expired. */
+    public static Lease EMPTY_LEASE = new Lease(null, MIN_VALUE, MIN_VALUE, 
false);
+
+    /** A node that holds a lease until {@code stopLeas}. */
+    private final ClusterNode leaseholder;
+
+    /** The lease is accepted, when the holder knows about it and applies all 
related obligations. */
+    private final boolean accepted;
+
+    /** Lease start timestamp. The timestamp is assigned when the lease 
created and is not changed when the lease is prolonged. */
+    private final HybridTimestamp startTime;
+
+    /** Timestamp to expiration the lease. */
+    private final HybridTimestamp expirationTime;
+
+    /**
+     * Creates a new lease.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     */
+    public Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime) {
+        this(leaseholder, startTime, leaseExpirationTime, false);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param leaseholder Lease holder.
+     * @param startTime Start lease timestamp.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     * @param accepted The flag is true when the holder accepted the lease, 
the false otherwise.
+     */
+    private Lease(ClusterNode leaseholder, HybridTimestamp startTime, 
HybridTimestamp leaseExpirationTime, boolean accepted) {
+        this.leaseholder = leaseholder;
+        this.expirationTime = leaseExpirationTime;
+        this.startTime = startTime;
+        this.accepted = accepted;
+    }
+
+    /**
+     * Prolongs a lease until new timestamp. Only an accepted lease can be 
prolonged.
+     *
+     * @param to The new lease expiration timestamp.
+     * @return A new lease which will have the same properties except of 
expiration timestamp.
+     */
+    public Lease prolongLease(HybridTimestamp to) {
+        assert accepted : "The lease should be accepted by leaseholder before 
prolongation ["
+                + "leaseholder=" + leaseholder
+                + ", expirationTime=" + expirationTime
+                + ", prolongTo=" + to + ']';
+
+        return new Lease(leaseholder, startTime, to, true);
+    }
+
+    /**
+     * Accepts the lease.
+     *
+     * @param to The new lease expiration timestamp.
+     * @return A accepted lease.
+     */
+    public Lease acceptLease(HybridTimestamp to) {
+        assert !accepted : "The lease is already accepted ["
+                + "leaseholder=" + leaseholder
+                + ", expirationTime=" + expirationTime + ']';
+
+        return new Lease(leaseholder, startTime, to, true);
+    }
+
+    /**
+     * Get a leaseholder node.
+     *
+     * @return Leaseholder or {@code null} if nothing holds the lease.
+     */
+    public ClusterNode getLeaseholder() {
+        return leaseholder;
+    }
+
+    /**
+     * Gets a lease start timestamp.
+     *
+     * @return Lease start timestamp.
+     */
+    public HybridTimestamp getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * Gets a lease expiration timestamp.
+     *
+     * @return Lease expiration timestamp or {@code null} if nothing holds the 
lease.
+     */
+    public HybridTimestamp getExpirationTime() {
+        return expirationTime;
+    }
+
+    /**
+     * Gets accepted flag.
+     *
+     * @return True if the lease accepted, false otherwise.
+     */
+    public boolean isAccepted() {
+        return accepted;
+    }
+
+    @Override
+    public String toString() {
+        return "Lease{"
+                + "leaseholder=" + leaseholder
+                + ", accepted=" + accepted
+                + ", startTime=" + startTime
+                + ", expirationTime=" + expirationTime
+                + '}';
+    }
+}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
similarity index 96%
rename from 
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
rename to 
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 4933f73593..466d716709 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.placementdriver;
+package org.apache.ignite.internal.placementdriver.leases;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+import static 
org.apache.ignite.internal.placementdriver.leases.Lease.EMPTY_LEASE;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -111,7 +112,7 @@ public class LeaseTracker {
      * @return A lease is associated with the group.
      */
     public @NotNull Lease getLease(ReplicationGroupId grpId) {
-        return leases.getOrDefault(grpId, Lease.EMPTY_LEASE);
+        return leases.getOrDefault(grpId, EMPTY_LEASE);
     }
 
     private static String incrementLastChar(String str) {
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
new file mode 100644
index 0000000000..1ba3d772b6
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.negotiation;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+
+/**
+ * The agreement is formed from {@link LeaseGrantedMessageResponse}.
+ */
+public class LeaseAgreement {
+    /** The agreement, which has not try negotiating yet. */
+    public static final LeaseAgreement UNDEFINED_AGREEMENT = new 
LeaseAgreement(null, completedFuture(null));
+
+    /** Lease. */
+    private final Lease lease;
+
+    /** Future to {@link LeaseGrantedMessageResponse} response. */
+    private final CompletableFuture<LeaseGrantedMessageResponse> responseFut;
+
+    /**
+     * The constructor.
+     *
+     * @param lease Lease.
+     * @param remoteNodeResponseFuture The future of response from the remote 
node which is negotiating the agreement.
+     */
+    public LeaseAgreement(Lease lease, 
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
+        this.lease = lease;
+        this.responseFut = remoteNodeResponseFuture;
+    }
+
+    /**
+     * Gets a lease about which the leaseholder was notified.
+     *
+     * @return Lease.
+     */
+    public Lease getLease() {
+        return lease;
+    }
+
+    /**
+     * Gets a accepted flag. The flag is true, when the lease is accepted by 
leaseholder.
+     *
+     * @return Accepted flag.
+     */
+    public boolean isAccepted() {
+        if (!responseFut.isDone()) {
+            return false;
+        }
+
+        LeaseGrantedMessageResponse resp = responseFut.join();
+
+        if (resp != null) {
+            return resp.accepted();
+        }
+
+        return false;
+    }
+
+    /**
+     * The property matches to {@link 
LeaseGrantedMessageResponse#redirectProposal()}.
+     * This property is available only when the agreement is ready (look at 
{@link #ready()}).
+     *
+     * @return Node id to propose a lease.
+     */
+    public String getRedirectTo() {
+        assert responseFut.isDone() : "The method should be invoked only after 
the agreement is ready";
+
+        LeaseGrantedMessageResponse resp = responseFut.join();
+
+        return resp != null ? resp.redirectProposal() : null;
+    }
+
+    /**
+     * Returns true if the agreement is negotiated, false otherwise.
+     *
+     * @return True if a response of the agreement has been received, false 
otherwise.
+     */
+    public boolean ready() {
+        return responseFut.isDone();
+    }
+}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
new file mode 100644
index 0000000000..eb8cdb2500
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.negotiation;
+
+import static 
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement.UNDEFINED_AGREEMENT;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeaseUpdater;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * This class negotiates a lease with leaseholder. If the lease is negotiated, 
it is ready available to accept.
+ */
+public class LeaseNegotiator {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseNegotiator.class);
+
+    private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
+    /** Leases ready to accept. */
+    private final Map<ReplicationGroupId, LeaseAgreement> leaseToNegotiate;
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     */
+    public LeaseNegotiator(ClusterService clusterService) {
+        this.clusterService = clusterService;
+
+        this.leaseToNegotiate = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Tries negotiating a lease with its leaseholder.
+     * The negotiation will achieve after the method is invoked. Use {@link 
#negotiated(ReplicationGroupId)} to check a result.
+     *
+     * @param groupId Lease replication group id.
+     * @param lease Lease to negotiate.
+     * @param force If the flag is true, the process tries to insist of apply 
the lease.
+     */
+    public void negotiate(ReplicationGroupId groupId, Lease lease, boolean 
force) {
+        var fut = new CompletableFuture<LeaseGrantedMessageResponse>();
+
+        leaseToNegotiate.put(groupId, new LeaseAgreement(lease, fut));
+
+        long leaseInterval = lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical();
+
+        clusterService.messagingService().invoke(
+                        lease.getLeaseholder().name(),
+                        PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
+                                .groupId(groupId)
+                                .leaseStartTime(lease.getStartTime())
+                                .leaseExpirationTime(lease.getExpirationTime())
+                                .force(force)
+                                .build(),
+                        leaseInterval)
+                .handle((msg, throwable) -> {
+                    if (throwable != null) {
+                        LOG.warn("Lease was not negotiated due to exception 
[lease={}]", throwable, lease);
+                    } else {
+                        assert msg instanceof LeaseGrantedMessageResponse : 
"Message type is unexpected [type="
+                                + msg.getClass().getSimpleName() + ']';
+                    }
+
+                    fut.complete((LeaseGrantedMessageResponse) msg);
+
+                    triggerToRenewLeases();
+
+                    return msg;
+                });
+    }
+
+    /**
+     * Gets a lease agreement or {@code null} if the agreement has not formed 
yet.
+     *
+     * @param groupId Replication group id.
+     * @return Lease agreement.
+     */
+    public LeaseAgreement negotiated(ReplicationGroupId groupId) {
+        LeaseAgreement agreement = leaseToNegotiate.getOrDefault(groupId, 
UNDEFINED_AGREEMENT);
+
+        return agreement;
+    }
+
+    /**
+     * Removes lease from list to negotiate.
+     *
+     * @param groupId Lease to expire.
+     */
+    public void onLeaseRemoved(ReplicationGroupId groupId) {
+        leaseToNegotiate.remove(groupId);
+    }
+
+    /**
+     * Triggers to renew leases forcibly. The method wakes up the monitor of 
{@link LeaseUpdater}.
+     */
+    private void triggerToRenewLeases() {
+        //TODO: IGNITE-18879 Implement lease maintenance.
+    }
+}

Reply via email to