This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c0bbb420d9 IGNITE-18742 Implement initial lease granting on placement
driver side (#1871)
c0bbb420d9 is described below
commit c0bbb420d9b4083853b97947714e8da4244c2d00
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Apr 13 23:06:58 2023 +0400
IGNITE-18742 Implement initial lease granting on placement driver side
(#1871)
---
.../ignite/internal/hlc/HybridTimestamp.java | 3 +
modules/placement-driver/build.gradle | 3 +-
.../internal/placementdriver/ActiveActorTest.java | 5 +
.../PlacementDriverManagerTest.java | 173 +++++++++++++++++++--
.../ignite/internal/placementdriver/Lease.java | 81 ----------
.../internal/placementdriver/LeaseUpdater.java | 127 ++++++++++++---
.../placementdriver/PlacementDriverManager.java | 38 +++--
.../internal/placementdriver/leases/Lease.java | 146 +++++++++++++++++
.../placementdriver/{ => leases}/LeaseTracker.java | 5 +-
.../negotiation/LeaseAgreement.java | 100 ++++++++++++
.../negotiation/LeaseNegotiator.java | 127 +++++++++++++++
11 files changed, 678 insertions(+), 130 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 6ac7f6e4d5..87e47d49d4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -33,6 +33,9 @@ public final class HybridTimestamp implements
Comparable<HybridTimestamp>, Seria
/** A constant holding the maximum value a {@code HybridTimestamp} can
have. */
public static final HybridTimestamp MAX_VALUE = new
HybridTimestamp(Long.MAX_VALUE, Integer.MAX_VALUE);
+ /** The constant holds the minimum value which {@code HybridTimestamp}
might formally have. */
+ public static final HybridTimestamp MIN_VALUE = new HybridTimestamp(1L,
-1);
+
/**
* Cluster cLock skew. The constant determines the undefined inclusive
interval to compares timestamp from various nodes.
* TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
diff --git a/modules/placement-driver/build.gradle
b/modules/placement-driver/build.gradle
index b12cf94ab0..f494b7ff26 100644
--- a/modules/placement-driver/build.gradle
+++ b/modules/placement-driver/build.gradle
@@ -22,8 +22,9 @@ apply from:
"$rootDir/buildscripts/java-integration-test.gradle"
apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
dependencies {
- implementation project(':ignite-metastorage-api')
+ api project(':ignite-placement-driver-api')
+ implementation project(':ignite-metastorage-api')
implementation project(':ignite-core')
implementation project(':ignite-configuration-api')
implementation project(':ignite-raft-api')
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 0f7eeb7a41..8a4292b97a 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -27,6 +27,8 @@ import static
org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.util.HashMap;
@@ -53,6 +55,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -438,6 +441,8 @@ public class ActiveActorTest extends IgniteAbstractTest {
int nodes,
int clientPort
) {
+ when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(completedFuture(true));
+
List<NetworkAddress> addresses = getNetworkAddresses(nodes);
var nodeFinder = new StaticNodeFinder(addresses);
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index eda296e2ab..355badbcf5 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -26,6 +26,8 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.lang.ByteArray.fromString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -38,7 +40,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -57,6 +61,10 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -66,6 +74,7 @@ import
org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -78,6 +87,7 @@ import
org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -88,13 +98,23 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ConfigurationExtension.class)
public class PlacementDriverManagerTest extends IgniteAbstractTest {
public static final int PORT = 1234;
+
+ private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
private String nodeName;
- HybridClock clock = new HybridClockImpl();
+ /** Another node name. The node name is matched to {@code
anotherClusterService}. */
+ private String anotherNodeName;
+
+ private HybridClock clock = new HybridClockImpl();
+
private VaultManager vaultManager;
private ClusterService clusterService;
+ /** This service is used to redirect a lease proposal. */
+ private ClusterService anotherClusterService;
+
private Loza raftManager;
@InjectConfiguration
@@ -112,11 +132,18 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
private TestInfo testInfo;
+ /** This closure handles {@link LeaseGrantedMessage} to check the
placement driver manager behavior. */
+ private BiConsumer<LeaseGrantedMessage, String> leaseGrantHandler;
+
@BeforeEach
public void beforeTest(TestInfo testInfo) throws NodeStoppingException {
this.nodeName = testNodeName(testInfo, PORT);
+ this.anotherNodeName = testNodeName(testInfo, PORT + 1);
this.testInfo = testInfo;
+ assertTrue(nodeName.hashCode() < anotherNodeName.hashCode(),
+ "Node for the first lease grant message should be determined
strictly.");
+
startPlacementDriverManager();
}
@@ -126,6 +153,29 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
var nodeFinder = new StaticNodeFinder(Collections.singletonList(new
NetworkAddress("localhost", PORT)));
clusterService = ClusterServiceTestUtils.clusterService(testInfo,
PORT, nodeFinder);
+ anotherClusterService =
ClusterServiceTestUtils.clusterService(testInfo, PORT + 1, nodeFinder);
+
+
anotherClusterService.messagingService().addMessageHandler(PlacementDriverMessageGroup.class,
(msg, sender, correlationId) -> {
+ assert msg instanceof LeaseGrantedMessage : "Message type is
unexpected [type=" + msg.getClass().getSimpleName() + ']';
+
+ log.info("Lease is being granted [actor={}, recipient={},
force={}]", sender, anotherNodeName,
+ ((LeaseGrantedMessage) msg).force());
+
+ if (leaseGrantHandler != null) {
+ leaseGrantHandler.accept((LeaseGrantedMessage) msg, sender);
+ }
+ });
+
+
clusterService.messagingService().addMessageHandler(PlacementDriverMessageGroup.class,
(msg, sender, correlationId) -> {
+ assert msg instanceof LeaseGrantedMessage : "Message type is
unexpected [type=" + msg.getClass().getSimpleName() + ']';
+
+ log.info("Lease is being granted [actor={}, recipient={},
force={}]", sender, nodeName,
+ ((LeaseGrantedMessage) msg).force());
+
+ if (leaseGrantHandler != null) {
+ leaseGrantHandler.accept((LeaseGrantedMessage) msg, sender);
+ }
+ });
ClusterManagementGroupManager cmgManager =
mock(ClusterManagementGroupManager.class);
@@ -183,6 +233,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
vaultManager.start();
clusterService.start();
+ anotherClusterService.start();
raftManager.start();
metaStorageManager.start();
placementDriverManager.start();
@@ -205,6 +256,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
placementDriverManager.stop();
metaStorageManager.stop();
raftManager.stop();
+ anotherClusterService.stop();
clusterService.stop();
vaultManager.stop();
}
@@ -237,14 +289,15 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
public void testLeaseCreate() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
- checkLeaseCreated(grpPart0);
+ checkLeaseCreated(grpPart0, false);
}
@Test
+ @WithSystemProperty(key = "IGNITE_LONG_LEASE", value = "200")
public void testLeaseRenew() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
- checkLeaseCreated(grpPart0);
+ checkLeaseCreated(grpPart0, false);
var leaseFut =
metaStorageManager.get(fromString(PLACEMENTDRIVER_PREFIX + grpPart0));
@@ -257,16 +310,17 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
Lease leaseRenew = ByteUtils.fromBytes(fut.join().value());
- return
lease.getLeaseExpirationTime().compareTo(leaseRenew.getLeaseExpirationTime()) <
0;
+ return
lease.getExpirationTime().compareTo(leaseRenew.getExpirationTime()) < 0;
}, 10_000));
}
@Test
+ @WithSystemProperty(key = "IGNITE_LONG_LEASE", value = "200")
public void testLeaseholderUpdate() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
- checkLeaseCreated(grpPart0);
+ checkLeaseCreated(grpPart0, false);
Set<Assignment> assignments = Set.of();
@@ -277,7 +331,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
Lease lease = ByteUtils.fromBytes(fut.join().value());
- return lease.getLeaseExpirationTime().compareTo(clock.now()) < 0;
+ return lease.getExpirationTime().compareTo(clock.now()) < 0;
}, 10_000));
@@ -290,37 +344,129 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
Lease lease = ByteUtils.fromBytes(fut.join().value());
- return lease.getLeaseExpirationTime().compareTo(clock.now()) > 0;
+ return lease.getExpirationTime().compareTo(clock.now()) > 0;
}, 10_000));
}
+ @Test
+ @Disabled("IGNITE-18958 Implement handling of lease grant responses on
placement driver side")
+ public void testLeaseAccepted() throws Exception {
+ TablePartitionId grpPart0 = createTableAssignment();
+
+ checkLeaseCreated(grpPart0, true);
+ }
+
+ @Test
+ @Disabled("IGNITE-18958 Implement handling of lease grant responses on
placement driver side")
+ public void testLeaseForceAccepted() throws Exception {
+ leaseGrantHandler = (req, sender) ->
+ PLACEMENT_DRIVER_MESSAGES_FACTORY
+ .leaseGrantedMessageResponse()
+ .accepted(req.force())
+ .build();
+
+ TablePartitionId grpPart0 = createTableAssignment();
+
+ checkLeaseCreated(grpPart0, true);
+ }
+
+ @Test
+ public void testExceptionOnAcceptance() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ leaseGrantHandler = (req, sender) -> {
+ latch.countDown();
+
+ throw new RuntimeException("test");
+ };
+
+ TablePartitionId grpPart0 = createTableAssignment();
+
+ checkLeaseCreated(grpPart0, false);
+
+ latch.await();
+
+ Lease lease = checkLeaseCreated(grpPart0, false);
+
+ assertFalse(lease.isAccepted());
+ }
+
+ @Test
+ @Disabled("IGNITE-18958 Implement handling of lease grant responses on
placement driver side")
+ public void testRedirectionAcceptance() throws Exception {
+ leaseGrantHandler = (req, sender) ->
+ PLACEMENT_DRIVER_MESSAGES_FACTORY
+ .leaseGrantedMessageResponse()
+ .accepted(false)
+ .redirectProposal(anotherNodeName)
+ .build();
+
+ TablePartitionId grpPart0 = createTableAssignment();
+
+ checkLeaseCreated(grpPart0, true);
+ }
+
@Test
public void testLeaseRestore() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
- checkLeaseCreated(grpPart0);
+ checkLeaseCreated(grpPart0, false);
stopPlacementDriverManager();
startPlacementDriverManager();
- checkLeaseCreated(grpPart0);
+ checkLeaseCreated(grpPart0, false);
+ }
+
+ @Test
+ public void testLeaseMatchGrantMessage() throws Exception {
+ var leaseGrantReqRef = new AtomicReference<LeaseGrantedMessage>();
+
+ leaseGrantHandler = (req, sender) -> {
+ leaseGrantReqRef.set(req);
+ };
+
+ TablePartitionId grpPart0 = createTableAssignment();
+
+ Lease lease = checkLeaseCreated(grpPart0, false);
+
+ assertTrue(waitForCondition(() -> leaseGrantReqRef.get() != null,
10_000));
+
+ assertEquals(leaseGrantReqRef.get().leaseStartTime(),
lease.getStartTime());
+ assertEquals(leaseGrantReqRef.get().leaseExpirationTime(),
lease.getExpirationTime());
}
/**
* Checks if a group lease is created during the timeout.
*
* @param grpPartId Replication group id.
+ * @param waitAccept Await a lease with the accepted flag.
+ * @return A lease that is read from Meta storage.
* @throws InterruptedException If the waiting is interrupted.
*/
- private void checkLeaseCreated(TablePartitionId grpPartId) throws
InterruptedException {
+ private Lease checkLeaseCreated(TablePartitionId grpPartId, boolean
waitAccept) throws InterruptedException {
+ AtomicReference<Lease> leaseRef = new AtomicReference<>();
+
assertTrue(waitForCondition(() -> {
var leaseFut =
metaStorageManager.get(fromString(PLACEMENTDRIVER_PREFIX + grpPartId));
var leaseEntry = leaseFut.join();
- return leaseEntry != null && !leaseEntry.empty();
+ if (leaseEntry != null && !leaseEntry.empty()) {
+ Lease lease = ByteUtils.fromBytes(leaseEntry.value());
+
+ if (!waitAccept) {
+ leaseRef.set(lease);
+ } else if (lease.isAccepted()) {
+ leaseRef.set(lease);
+ }
+ }
+
+ return leaseRef.get() != null;
}, 10_000));
+
+ return leaseRef.get();
}
/**
@@ -332,7 +478,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
private TablePartitionId createTableAssignment() throws Exception {
AtomicReference<UUID> tblIdRef = new AtomicReference<>();
- List<Set<Assignment>> assignments =
AffinityUtils.calculateAssignments(Collections.singleton(nodeName), 1, 1);
+ List<Set<Assignment>> assignments =
AffinityUtils.calculateAssignments(List.of(nodeName, anotherNodeName), 1, 2);
int zoneId = createZone();
@@ -350,6 +496,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
var grpPart0 = new TablePartitionId(tblIdRef.get(), 0);
log.info("Fake table created [id={}, repGrp={}]", tblIdRef.get(),
grpPart0);
+
return grpPart0;
}
@@ -362,7 +509,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
dstZnsCfg.distributionZones().change(zones -> {
zones.create("zone1", ch -> {
ch.changePartitions(1);
- ch.changeReplicas(1);
+ ch.changeReplicas(2);
ch.changeZoneId(DEFAULT_ZONE_ID + 1);
});
}).join();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java
deleted file mode 100644
index e53428983b..0000000000
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.placementdriver;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.network.ClusterNode;
-
-/**
- * A lease representation in memory.
- * The real lease is stored in Meta storage.
- */
-public class Lease implements Serializable {
- /** The object is used when nothing holds the lease. */
- public static Lease EMPTY_LEASE = new Lease(null, null);
-
- /** A node that holds a lease until {@code stopLeas}. */
- private final ClusterNode leaseholder;
-
- /** Timestamp to expiration the lease. */
- private final HybridTimestamp leaseExpirationTime;
-
- /**
- * Default constructor.
- */
- public Lease() {
- this(null, null);
- }
-
- /**
- * The constructor.
- *
- * @param leaseholder Lease holder.
- * @param leaseExpirationTime Lease expiration timestamp.
- */
- public Lease(ClusterNode leaseholder, HybridTimestamp leaseExpirationTime)
{
- this.leaseholder = leaseholder;
- this.leaseExpirationTime = leaseExpirationTime;
- }
-
- /**
- * Get a leaseholder node.
- *
- * @return Leaseholder or {@code null} if nothing holds the lease.
- */
- public ClusterNode getLeaseholder() {
- return leaseholder;
- }
-
- /**
- * Gets a lease expiration timestamp.
- *
- * @return Lease expiration timestamp or {@code null} if nothing holds the
lease.
- */
- public HybridTimestamp getLeaseExpirationTime() {
- return leaseExpirationTime;
- }
-
- @Override
- public String toString() {
- return "Lease {"
- + "leaseholder=" + leaseholder
- + ", leaseExpirationTime=" + leaseExpirationTime
- + '}';
- }
-}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 223a8f87ef..1b4187d264 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
-import static org.apache.ignite.internal.placementdriver.Lease.EMPTY_LEASE;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
import java.util.Map;
@@ -35,13 +34,18 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
+import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
/**
* A processor to manger leases. The process is started when placement driver
activates and stopped when it deactivates.
@@ -54,7 +58,13 @@ public class LeaseUpdater {
private static final long UPDATE_LEASE_MS = 200L;
/** Lease holding interval. */
- private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+ public static final long LEASE_INTERVAL = 10 * UPDATE_LEASE_MS;
+
+ /** Long lease interval. The interval is used between lease granting
attempts. */
+ private final long longLeaseInterval;
+
+ /** Cluster service. */
+ private final ClusterService clusterService;
/** Meta storage manager. */
private final MetaStorageManager msManager;
@@ -77,20 +87,25 @@ public class LeaseUpdater {
/** Dedicated thread to update leases. */
private volatile Thread updaterTread;
+ /** Processor to communicate with the leaseholder to negotiate the lease.
*/
+ private LeaseNegotiator leaseNegotiator;
+
/** Node name. */
private String nodeName;
/**
* The constructor.
*
+ * @param clusterService Cluster service.
* @param vaultManager Vault manager.
- * @param msManager Metastorage manager.
+ * @param msManager Meta storage manager.
* @param topologyService Topology service.
* @param tablesConfiguration Tables configuration.
* @param leaseTracker Lease tracker.
* @param clock Cluster clock.
*/
public LeaseUpdater(
+ ClusterService clusterService,
VaultManager vaultManager,
MetaStorageManager msManager,
LogicalTopologyService topologyService,
@@ -99,10 +114,12 @@ public class LeaseUpdater {
LeaseTracker leaseTracker,
HybridClock clock
) {
+ this.clusterService = clusterService;
this.msManager = msManager;
this.leaseTracker = leaseTracker;
this.clock = clock;
+ this.longLeaseInterval =
IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 120_000L);
this.assignmentsTracker = new AssignmentsTracker(vaultManager,
msManager, tablesConfiguration, distributionZonesConfiguration);
this.topologyTracker = new TopologyTracker(topologyService);
this.updater = new Updater();
@@ -130,6 +147,8 @@ public class LeaseUpdater {
* Activates a lease updater to renew leases.
*/
public void activate() {
+ leaseNegotiator = new LeaseNegotiator(clusterService);
+
//TODO: IGNITE-18879 Implement lease maintenance.
updaterTread = new Thread(updater,
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
@@ -146,6 +165,8 @@ public class LeaseUpdater {
updaterTread = null;
}
+
+ leaseNegotiator = null;
}
/**
@@ -156,8 +177,16 @@ public class LeaseUpdater {
*/
private ClusterNode nextLeaseHolder(Set<Assignment> assignments) {
//TODO: IGNITE-18879 Implement more intellectual algorithm to choose a
node.
+ String consistentId = null;
+
for (Assignment assignment : assignments) {
- ClusterNode candidate =
topologyTracker.nodeByConsistentId(assignment.consistentId());
+ if (consistentId == null || consistentId.hashCode() >
assignment.consistentId().hashCode()) {
+ consistentId = assignment.consistentId();
+ }
+ }
+
+ if (consistentId != null) {
+ ClusterNode candidate =
topologyTracker.nodeByConsistentId(consistentId);
if (candidate != null) {
return candidate;
@@ -174,29 +203,30 @@ public class LeaseUpdater {
@Override
public void run() {
while (updaterTread != null && !updaterTread.isInterrupted()) {
+ long outdatedLeaseThreshold = clock.now().getPhysical() +
LEASE_INTERVAL / 2;
+
for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry :
assignmentsTracker.assignments().entrySet()) {
ReplicationGroupId grpId = entry.getKey();
Lease lease = leaseTracker.getLease(grpId);
- HybridTimestamp now = clock.now();
-
- // Nothing holds the lease.
- if (lease == EMPTY_LEASE
- // The lease is near to expiration.
- || now.getPhysical() >
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+ // The lease is expired or close to this.
+ if (lease.getExpirationTime().getPhysical() <
outdatedLeaseThreshold) {
ClusterNode candidate =
nextLeaseHolder(entry.getValue());
if (candidate == null) {
continue;
}
- if (isReplicationGroupUpdateLeaseholder(lease,
candidate)) {
- updateLeaseInMetaStorage(grpId, lease, candidate);
+ // We can't prolong the expired lease because we
already have an interval of time when the lease was not active,
+ // so we must start ne negotiation round from the
beginning; the same we do for the groups that don't have
+ // leaseholders at all.
+ if (isLeaseOutdated(lease)) {
// New lease is granting.
- } else if (candidate.equals(lease.getLeaseholder())) {
- updateLeaseInMetaStorage(grpId, lease, candidate);
+ writeNewLeasInMetaStorage(grpId, lease, candidate);
+ } else if (lease.isAccepted() &&
candidate.equals(lease.getLeaseholder())) {
// Old lease is renewing.
+ prolongLeaseInMetaStorage(grpId, lease);
}
}
}
@@ -210,39 +240,90 @@ public class LeaseUpdater {
}
/**
- * Writes a lease in Meta storage.
+ * Writes a new lease in Meta storage.
*
* @param grpId Replication group id.
* @param lease Old lease to apply CAS in Meta storage.
* @param candidate Lease candidate.
*/
- private void updateLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease, ClusterNode candidate) {
+ private void writeNewLeasInMetaStorage(ReplicationGroupId grpId, Lease
lease, ClusterNode candidate) {
var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
- var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ HybridTimestamp startTs = clock.now();
+
+ var expirationTs = new HybridTimestamp(startTs.getPhysical() +
longLeaseInterval, 0);
byte[] leaseRaw = ByteUtils.toBytes(lease);
- Lease renewedLease = new Lease(candidate, newTs);
+ Lease renewedLease = new Lease(candidate, startTs, expirationTs);
msManager.invoke(
or(notExists(leaseKey), value(leaseKey).eq(leaseRaw)),
put(leaseKey, ByteUtils.toBytes(renewedLease)),
noop()
+ ).thenAccept(isCreated -> {
+ if (isCreated) {
+ boolean force = candidate.equals(lease.getLeaseholder());
+
+ leaseNegotiator.negotiate(grpId, renewedLease, force);
+ }
+ });
+ }
+
+ /**
+ * Writes a prolong lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to prolong.
+ */
+ private void prolongLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
+ var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
+ var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_INTERVAL, 0);
+
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ Lease renewedLease = lease.prolongLease(newTs);
+
+ msManager.invoke(
+ value(leaseKey).eq(leaseRaw),
+ put(leaseKey, ByteUtils.toBytes(renewedLease)),
+ noop()
+ );
+ }
+
+ /**
+ * Writes an accepted lease in Meta storage. After the lease will be
written to Meta storage,
+ * the lease becomes available to all components.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to accept.
+ */
+ private void publishLease(ReplicationGroupId grpId, Lease lease) {
+ var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
+ var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_INTERVAL, 0);
+
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ Lease renewedLease = lease.acceptLease(newTs);
+
+ msManager.invoke(
+ value(leaseKey).eq(leaseRaw),
+ put(leaseKey, ByteUtils.toBytes(renewedLease)),
+ noop()
);
}
/**
- * Checks that a leaseholder candidate can take a lease on the
replication group.
+ * Checks that the lease is outdated.
+ * {@link Lease#EMPTY_LEASE} is always outdated.
*
* @param lease Lease.
- * @param candidate The node is a leaseholder candidate.
* @return True when the candidate can be a leaseholder, otherwise
false.
*/
- private boolean isReplicationGroupUpdateLeaseholder(Lease lease,
ClusterNode candidate) {
+ private boolean isLeaseOutdated(Lease lease) {
HybridTimestamp now = clock.now();
- return lease == EMPTY_LEASE
- || (!candidate.equals(lease.getLeaseholder()) &&
now.after(lease.getLeaseExpirationTime()));
+ return now.after(lease.getExpirationTime());
}
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index e0894a7abb..0680e5bc73 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import
org.apache.ignite.internal.distributionzones.configuration.DistributionZo
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
@@ -38,6 +40,7 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -49,7 +52,7 @@ import org.jetbrains.annotations.TestOnly;
* The another role of the manager is providing a node, which is leaseholder
at the moment, for a particular replication group.
*/
public class PlacementDriverManager implements IgniteComponent {
- static final String PLACEMENTDRIVER_PREFIX = "placementdriver.lease.";
+ public static final String PLACEMENTDRIVER_PREFIX =
"placementdriver.lease.";
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -118,6 +121,7 @@ public class PlacementDriverManager implements
IgniteComponent {
this.raftClientFuture = new CompletableFuture<>();
this.leaseTracker = new LeaseTracker(vaultManager, metaStorageMgr);
this.leaseUpdater = new LeaseUpdater(
+ clusterService,
vaultManager,
metaStorageMgr,
logicalTopologyService,
@@ -165,13 +169,20 @@ public class PlacementDriverManager implements
IgniteComponent {
/** {@inheritDoc} */
@Override
public void beforeNodeStop() {
- withRaftClientIfPresent(c -> {
- c.unsubscribeLeader().join();
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+ try {
+ withRaftClientIfPresent(c -> {
+ c.unsubscribeLeader().join();
- leaseUpdater.deInit();
- });
+ leaseUpdater.deInit();
+ });
- leaseTracker.stopTrack();
+ leaseTracker.stopTrack();
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -195,10 +206,17 @@ public class PlacementDriverManager implements
IgniteComponent {
}
private void onLeaderChange(ClusterNode leader, Long term) {
- if (leader.equals(clusterService.topologyService().localMember())) {
- takeOverActiveActor();
- } else {
- stepDownActiveActor();
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+ try {
+ if (leader.equals(clusterService.topologyService().localMember()))
{
+ takeOverActiveActor();
+ } else {
+ stepDownActiveActor();
+ }
+ } finally {
+ busyLock.leaveBusy();
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
new file mode 100644
index 0000000000..50d2988655
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. Empty lease is always
expired. */
+ public static Lease EMPTY_LEASE = new Lease(null, MIN_VALUE, MIN_VALUE,
false);
+
+ /** A node that holds a lease until {@code stopLeas}. */
+ private final ClusterNode leaseholder;
+
+ /** The lease is accepted, when the holder knows about it and applies all
related obligations. */
+ private final boolean accepted;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and is not changed when the lease is prolonged. */
+ private final HybridTimestamp startTime;
+
+ /** Timestamp to expiration the lease. */
+ private final HybridTimestamp expirationTime;
+
+ /**
+ * Creates a new lease.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ */
+ public Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime) {
+ this(leaseholder, startTime, leaseExpirationTime, false);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ * @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
+ */
+ private Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime, boolean accepted) {
+ this.leaseholder = leaseholder;
+ this.expirationTime = leaseExpirationTime;
+ this.startTime = startTime;
+ this.accepted = accepted;
+ }
+
+ /**
+ * Prolongs a lease until new timestamp. Only an accepted lease can be
prolonged.
+ *
+ * @param to The new lease expiration timestamp.
+ * @return A new lease which will have the same properties except of
expiration timestamp.
+ */
+ public Lease prolongLease(HybridTimestamp to) {
+ assert accepted : "The lease should be accepted by leaseholder before
prolongation ["
+ + "leaseholder=" + leaseholder
+ + ", expirationTime=" + expirationTime
+ + ", prolongTo=" + to + ']';
+
+ return new Lease(leaseholder, startTime, to, true);
+ }
+
+ /**
+ * Accepts the lease.
+ *
+ * @param to The new lease expiration timestamp.
+ * @return A accepted lease.
+ */
+ public Lease acceptLease(HybridTimestamp to) {
+ assert !accepted : "The lease is already accepted ["
+ + "leaseholder=" + leaseholder
+ + ", expirationTime=" + expirationTime + ']';
+
+ return new Lease(leaseholder, startTime, to, true);
+ }
+
+ /**
+ * Get a leaseholder node.
+ *
+ * @return Leaseholder or {@code null} if nothing holds the lease.
+ */
+ public ClusterNode getLeaseholder() {
+ return leaseholder;
+ }
+
+ /**
+ * Gets a lease start timestamp.
+ *
+ * @return Lease start timestamp.
+ */
+ public HybridTimestamp getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Gets a lease expiration timestamp.
+ *
+ * @return Lease expiration timestamp or {@code null} if nothing holds the
lease.
+ */
+ public HybridTimestamp getExpirationTime() {
+ return expirationTime;
+ }
+
+ /**
+ * Gets accepted flag.
+ *
+ * @return True if the lease accepted, false otherwise.
+ */
+ public boolean isAccepted() {
+ return accepted;
+ }
+
+ @Override
+ public String toString() {
+ return "Lease{"
+ + "leaseholder=" + leaseholder
+ + ", accepted=" + accepted
+ + ", startTime=" + startTime
+ + ", expirationTime=" + expirationTime
+ + '}';
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
similarity index 96%
rename from
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
rename to
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 4933f73593..466d716709 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.placementdriver;
+package org.apache.ignite.internal.placementdriver.leases;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+import static
org.apache.ignite.internal.placementdriver.leases.Lease.EMPTY_LEASE;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -111,7 +112,7 @@ public class LeaseTracker {
* @return A lease is associated with the group.
*/
public @NotNull Lease getLease(ReplicationGroupId grpId) {
- return leases.getOrDefault(grpId, Lease.EMPTY_LEASE);
+ return leases.getOrDefault(grpId, EMPTY_LEASE);
}
private static String incrementLastChar(String str) {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
new file mode 100644
index 0000000000..1ba3d772b6
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.negotiation;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+
+/**
+ * The agreement is formed from {@link LeaseGrantedMessageResponse}.
+ */
+public class LeaseAgreement {
+ /** The agreement, which has not try negotiating yet. */
+ public static final LeaseAgreement UNDEFINED_AGREEMENT = new
LeaseAgreement(null, completedFuture(null));
+
+ /** Lease. */
+ private final Lease lease;
+
+ /** Future to {@link LeaseGrantedMessageResponse} response. */
+ private final CompletableFuture<LeaseGrantedMessageResponse> responseFut;
+
+ /**
+ * The constructor.
+ *
+ * @param lease Lease.
+ * @param remoteNodeResponseFuture The future of response from the remote
node which is negotiating the agreement.
+ */
+ public LeaseAgreement(Lease lease,
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
+ this.lease = lease;
+ this.responseFut = remoteNodeResponseFuture;
+ }
+
+ /**
+ * Gets a lease about which the leaseholder was notified.
+ *
+ * @return Lease.
+ */
+ public Lease getLease() {
+ return lease;
+ }
+
+ /**
+ * Gets a accepted flag. The flag is true, when the lease is accepted by
leaseholder.
+ *
+ * @return Accepted flag.
+ */
+ public boolean isAccepted() {
+ if (!responseFut.isDone()) {
+ return false;
+ }
+
+ LeaseGrantedMessageResponse resp = responseFut.join();
+
+ if (resp != null) {
+ return resp.accepted();
+ }
+
+ return false;
+ }
+
+ /**
+ * The property matches to {@link
LeaseGrantedMessageResponse#redirectProposal()}.
+ * This property is available only when the agreement is ready (look at
{@link #ready()}).
+ *
+ * @return Node id to propose a lease.
+ */
+ public String getRedirectTo() {
+ assert responseFut.isDone() : "The method should be invoked only after
the agreement is ready";
+
+ LeaseGrantedMessageResponse resp = responseFut.join();
+
+ return resp != null ? resp.redirectProposal() : null;
+ }
+
+ /**
+ * Returns true if the agreement is negotiated, false otherwise.
+ *
+ * @return True if a response of the agreement has been received, false
otherwise.
+ */
+ public boolean ready() {
+ return responseFut.isDone();
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
new file mode 100644
index 0000000000..eb8cdb2500
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.negotiation;
+
+import static
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement.UNDEFINED_AGREEMENT;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeaseUpdater;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * This class negotiates a lease with leaseholder. If the lease is negotiated,
it is ready available to accept.
+ */
+public class LeaseNegotiator {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(LeaseNegotiator.class);
+
+ private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
+ /** Leases ready to accept. */
+ private final Map<ReplicationGroupId, LeaseAgreement> leaseToNegotiate;
+
+ /** Cluster service. */
+ private final ClusterService clusterService;
+
+ /**
+ * The constructor.
+ *
+ * @param clusterService Cluster service.
+ */
+ public LeaseNegotiator(ClusterService clusterService) {
+ this.clusterService = clusterService;
+
+ this.leaseToNegotiate = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Tries negotiating a lease with its leaseholder.
+ * The negotiation will achieve after the method is invoked. Use {@link
#negotiated(ReplicationGroupId)} to check a result.
+ *
+ * @param groupId Lease replication group id.
+ * @param lease Lease to negotiate.
+ * @param force If the flag is true, the process tries to insist of apply
the lease.
+ */
+ public void negotiate(ReplicationGroupId groupId, Lease lease, boolean
force) {
+ var fut = new CompletableFuture<LeaseGrantedMessageResponse>();
+
+ leaseToNegotiate.put(groupId, new LeaseAgreement(lease, fut));
+
+ long leaseInterval = lease.getExpirationTime().getPhysical() -
lease.getStartTime().getPhysical();
+
+ clusterService.messagingService().invoke(
+ lease.getLeaseholder().name(),
+ PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
+ .groupId(groupId)
+ .leaseStartTime(lease.getStartTime())
+ .leaseExpirationTime(lease.getExpirationTime())
+ .force(force)
+ .build(),
+ leaseInterval)
+ .handle((msg, throwable) -> {
+ if (throwable != null) {
+ LOG.warn("Lease was not negotiated due to exception
[lease={}]", throwable, lease);
+ } else {
+ assert msg instanceof LeaseGrantedMessageResponse :
"Message type is unexpected [type="
+ + msg.getClass().getSimpleName() + ']';
+ }
+
+ fut.complete((LeaseGrantedMessageResponse) msg);
+
+ triggerToRenewLeases();
+
+ return msg;
+ });
+ }
+
+ /**
+ * Gets a lease agreement or {@code null} if the agreement has not formed
yet.
+ *
+ * @param groupId Replication group id.
+ * @return Lease agreement.
+ */
+ public LeaseAgreement negotiated(ReplicationGroupId groupId) {
+ LeaseAgreement agreement = leaseToNegotiate.getOrDefault(groupId,
UNDEFINED_AGREEMENT);
+
+ return agreement;
+ }
+
+ /**
+ * Removes lease from list to negotiate.
+ *
+ * @param groupId Lease to expire.
+ */
+ public void onLeaseRemoved(ReplicationGroupId groupId) {
+ leaseToNegotiate.remove(groupId);
+ }
+
+ /**
+ * Triggers to renew leases forcibly. The method wakes up the monitor of
{@link LeaseUpdater}.
+ */
+ private void triggerToRenewLeases() {
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ }
+}