This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 62c8a59424 IGNITE-20678 Adding ReplicaMeta#getLeaseholderId to avoid
errors during node recovery (#2741)
62c8a59424 is described below
commit 62c8a59424ead82c025792db5272c78fab682449
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Oct 31 17:04:14 2023 +0300
IGNITE-20678 Adding ReplicaMeta#getLeaseholderId to avoid errors during
node recovery (#2741)
---
.../org/apache/ignite/internal/util/ByteUtils.java | 23 +++
.../apache/ignite/internal/util/ByteUtilsTest.java | 46 +++++
.../internal/index/IndexBuildControllerTest.java | 37 ++--
modules/placement-driver-api/build.gradle | 1 +
.../internal/placementdriver/PlacementDriver.java | 6 +
.../internal/placementdriver/ReplicaMeta.java | 28 +--
.../placementdriver/TestPlacementDriver.java | 34 +++-
.../placementdriver/TestReplicaMetaImpl.java | 78 ++++++--
.../internal/placementdriver/LeaseUpdater.java | 4 +-
.../internal/placementdriver/leases/Lease.java | 219 ++++++++++++---------
.../placementdriver/leases/LeaseTracker.java | 10 +-
.../internal/placementdriver/LeaseTrackerTest.java | 11 +-
.../internal/placementdriver/LeaseUpdaterTest.java | 2 +-
.../placementdriver/PlacementDriverTest.java | 7 +
.../leases/LeaseSerializationTest.java | 77 +++++---
.../ItPlacementDriverReplicaSideTest.java | 26 +--
.../replicator/PlacementDriverReplicaSideTest.java | 2 +-
.../rebalance/ItRebalanceDistributedTest.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../exec/rel/TableScanNodeExecutionTest.java | 7 +-
.../ignite/distributed/ItTablePersistenceTest.java | 12 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 20 +-
.../ignite/internal/table/ItColocationTest.java | 4 +-
.../table/distributed/TableManagerTest.java | 2 +-
.../PartitionReplicaListenerDurableUnlockTest.java | 13 +-
.../PartitionReplicaListenerIndexLockingTest.java | 2 +-
.../replication/PartitionReplicaListenerTest.java | 2 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 5 +-
.../table/impl/DummyInternalTableImpl.java | 2 +-
.../apache/ignite/internal/tx/TxManagerTest.java | 15 +-
30 files changed, 470 insertions(+), 229 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index cec4ce2754..24bf692d63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -17,12 +17,16 @@
package org.apache.ignite.internal.util;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
/**
* Utility class provides various method for manipulating with bytes.
@@ -215,4 +219,23 @@ public class ByteUtils {
throw new IgniteInternalException("Could not deserialize an
object", e);
}
}
+
+ /**
+ * Converts a string to a byte array using {@link StandardCharsets#UTF_8},
{@code null} if {@code s} is {@code null}.
+ *
+ * @param s String to convert.
+ * @see #stringFromBytes(byte[])
+ */
+ public static byte @Nullable [] stringToBytes(@Nullable String s) {
+ return s == null ? null : s.getBytes(UTF_8);
+ }
+
+ /**
+ * Converts a byte array to a string using {@link StandardCharsets#UTF_8},
{@code null} if {@code bytes} is {@code null}.
+ *
+ * @param bytes String bytes.
+ */
+ public static @Nullable String stringFromBytes(byte @Nullable [] bytes) {
+ return bytes == null ? null : new String(bytes, UTF_8);
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/ByteUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ByteUtilsTest.java
new file mode 100644
index 0000000000..ad99e9fc67
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/ByteUtilsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.util.ByteUtils.stringFromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+/** For {@link ByteUtils} testing. */
+public class ByteUtilsTest {
+ @Test
+ void testStringToBytes() {
+ assertNull(stringToBytes(null));
+
+ assertArrayEquals("".getBytes(UTF_8), stringToBytes(""));
+ assertArrayEquals("abc".getBytes(UTF_8), stringToBytes("abc"));
+ }
+
+ @Test
+ void testStringFromBytes() {
+ assertNull(stringFromBytes(null));
+
+ assertEquals("", stringFromBytes(stringToBytes("")));
+ assertEquals("abc", stringFromBytes(stringToBytes("abc")));
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 624aa297fa..c3ce885949 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -65,7 +65,9 @@ import
org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -75,6 +77,8 @@ import org.junit.jupiter.api.Test;
public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
private static final String NODE_NAME = "test_node";
+ private static final String NODE_ID = "test_node_id";
+
private static final String TABLE_NAME = "test_table";
private static final String COLUMN_NAME = "test_column";
@@ -83,7 +87,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
private static final int PARTITION_ID = 10;
- private ClusterNode localNode;
+ private final ClusterNode localNode = new ClusterNodeImpl(NODE_ID,
NODE_NAME, mock(NetworkAddress.class));
private IndexBuilder indexBuilder;
@@ -97,8 +101,6 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp() {
- localNode = mock(ClusterNode.class, invocation -> NODE_NAME);
-
indexBuilder = mock(IndexBuilder.class);
IndexManager indexManager = mock(IndexManager.class, invocation -> {
@@ -136,7 +138,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
@Test
void testStartBuildIndexesOnIndexCreate() {
- setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
clock.now());
+ setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
NODE_ID, clock.now());
clearInvocations(indexBuilder);
@@ -157,7 +159,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
void testStartBuildIndexesOnPrimaryReplicaElected() {
createIndex(INDEX_NAME);
- setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
clock.now());
+ setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
NODE_ID, clock.now());
verify(indexBuilder).scheduleBuildIndex(
eq(tableId()),
@@ -193,8 +195,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
@Test
void testStopBuildIndexesOnChangePrimaryReplica() {
- setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
clock.now());
- setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME +
"_other", clock.now());
+ setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
NODE_ID, clock.now());
+ setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME +
"_other", NODE_ID + "_other", clock.now());
verify(indexBuilder).stopBuildingIndexes(tableId(), PARTITION_ID);
}
@@ -207,7 +209,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
makeIndexAvailable(indexId);
- setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
clock.now());
+ setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME,
NODE_ID, clock.now());
verify(indexBuilder, never()).scheduleBuildIndex(
eq(tableId()),
@@ -242,8 +244,13 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME,
indexName);
}
- private void setPrimaryReplicaWhichExpiresInOneSecond(int partitionId,
String leaseholder, HybridTimestamp startTime) {
- CompletableFuture<ReplicaMeta> replicaMetaFuture =
completedFuture(replicaMetaForOneSecond(leaseholder, startTime));
+ private void setPrimaryReplicaWhichExpiresInOneSecond(
+ int partitionId,
+ String leaseholder,
+ String leaseholderId,
+ HybridTimestamp startTime
+ ) {
+ CompletableFuture<ReplicaMeta> replicaMetaFuture =
completedFuture(replicaMetaForOneSecond(leaseholder, leaseholderId, startTime));
assertThat(placementDriver.setPrimaryReplicaMeta(0,
replicaId(partitionId), replicaMetaFuture), willCompleteSuccessfully());
}
@@ -260,8 +267,14 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
return new TablePartitionId(tableId(), partitionId);
}
- private ReplicaMeta replicaMetaForOneSecond(String leaseholder,
HybridTimestamp startTime) {
- return new Lease(leaseholder, startTime,
startTime.addPhysicalTime(1_000), new TablePartitionId(tableId(),
PARTITION_ID));
+ private ReplicaMeta replicaMetaForOneSecond(String leaseholder, String
leaseholderId, HybridTimestamp startTime) {
+ return new Lease(
+ leaseholder,
+ leaseholderId,
+ startTime,
+ startTime.addPhysicalTime(1_000),
+ new TablePartitionId(tableId(), PARTITION_ID)
+ );
}
private static class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters>
implements
diff --git a/modules/placement-driver-api/build.gradle
b/modules/placement-driver-api/build.gradle
index a05a01e61e..4a6385f667 100644
--- a/modules/placement-driver-api/build.gradle
+++ b/modules/placement-driver-api/build.gradle
@@ -28,6 +28,7 @@ dependencies {
implementation libs.jetbrains.annotations
testFixturesImplementation project(':ignite-core')
+ testFixturesImplementation project(':ignite-network-api')
testFixturesImplementation libs.jetbrains.annotations
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index b7a3b404a7..73d887cbb4 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -27,6 +27,12 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
/**
* Service that provides an ability to await and retrieve primary replicas for
replication groups.
+ *
+ * <p>Notes: If during recovery, the component needs to perform actions
depending on whether the primary replica for some replication group
+ * is a local node, then it needs to use {@link
#getPrimaryReplica(ReplicationGroupId, HybridTimestamp)}. Then compare the
local node with
+ * {@link ReplicaMeta#getLeaseholder()} and {@link
ReplicaMeta#getLeaseholderId()} and make sure that it has not yet expired by
+ * {@link ReplicaMeta#getExpirationTime()}. And only then can we consider that
the local node is the primary replica for the requested
+ * replication group.</p>
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-20646 Consider using
CLOCK_SKEW unaware await/getPrimaryReplica()
public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent,
PrimaryReplicaEventParameters> {
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
index c22b2fe3f6..01bbf819cd 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
@@ -19,29 +19,19 @@ package org.apache.ignite.internal.placementdriver;
import java.io.Serializable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
-/**
- * Replica lease meta.
- */
+/** Replica lease meta. */
public interface ReplicaMeta extends Serializable {
- /**
- * Gets a leaseholder node consistent ID.
- *
- * @return Leaseholder or {@code null} if nothing holds the lease.
- */
- String getLeaseholder();
+ /** Gets a leaseholder node consistent ID (assigned to a node once),
{@code null} if nothing holds the lease. */
+ @Nullable String getLeaseholder();
+
+ /** Gets a leaseholder node ID (changes on every node startup), {@code
null} if nothing holds the lease. */
+ @Nullable String getLeaseholderId();
- /**
- * Gets a lease start timestamp.
- *
- * @return Lease start timestamp.
- */
+ /** Gets a lease start timestamp. */
HybridTimestamp getStartTime();
- /**
- * Gets a lease expiration timestamp.
- *
- * @return Lease expiration timestamp or {@code null} if nothing holds the
lease.
- */
+ /** Gets a lease expiration timestamp. */
HybridTimestamp getExpirationTime();
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 56e0f4d35f..21be91c984 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -18,15 +18,18 @@
package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import java.util.function.Supplier;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -37,13 +40,26 @@ import org.jetbrains.annotations.TestOnly;
@TestOnly
public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters>
implements PlacementDriver {
- private final TestReplicaMetaImpl primaryReplica;
+ private final Supplier<TestReplicaMetaImpl> primaryReplicaSupplier;
@Nullable
private BiFunction<ReplicationGroupId, HybridTimestamp,
CompletableFuture<ReplicaMeta>> awaitPrimaryReplicaFunction = null;
- public TestPlacementDriver(String leaseholder) {
- this.primaryReplica = new TestReplicaMetaImpl(leaseholder);
+ /** Auxiliary constructor that will create replica meta by {@link
TestReplicaMetaImpl#TestReplicaMetaImpl(ClusterNode)} internally. */
+ public TestPlacementDriver(ClusterNode leaseholder) {
+ primaryReplicaSupplier = () -> new TestReplicaMetaImpl(leaseholder);
+ }
+
+ /**
+ * Auxiliary constructor that allows you to create {@link
TestPlacementDriver} in cases where the node ID will be known only after the
+ * start of the components/node. Will use {@link
TestReplicaMetaImpl#TestReplicaMetaImpl(ClusterNode)}.
+ */
+ public TestPlacementDriver(Supplier<ClusterNode> leaseholderSupplier) {
+ primaryReplicaSupplier = () -> new
TestReplicaMetaImpl(leaseholderSupplier.get());
+ }
+
+ public TestPlacementDriver(String leaseholder, String leaseholderId) {
+ primaryReplicaSupplier = () -> new TestReplicaMetaImpl(leaseholder,
leaseholderId);
}
@Override
@@ -57,12 +73,12 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return awaitPrimaryReplicaFunction.apply(groupId, timestamp);
}
- return completedFuture(primaryReplica);
+ return getReplicaMetaFuture();
}
@Override
public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
- return completedFuture(primaryReplica);
+ return getReplicaMetaFuture();
}
@Override
@@ -80,4 +96,12 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
) {
this.awaitPrimaryReplicaFunction = awaitPrimaryReplicaFunction;
}
+
+ private CompletableFuture<ReplicaMeta> getReplicaMetaFuture() {
+ try {
+ return completedFuture(primaryReplicaSupplier.get());
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ }
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
index 35d01de9ed..0dc2c4a51b 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.placementdriver;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MAX_VALUE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/** Test implementation of the {@link ReplicaMeta}. */
@@ -28,8 +31,11 @@ import org.jetbrains.annotations.TestOnly;
public class TestReplicaMetaImpl implements ReplicaMeta {
private static final long serialVersionUID = -382174507405586033L;
- /** A node that holds a lease. */
- private final String leaseholder;
+ /** A node consistent ID that holds a lease, {@code null} if nothing holds
the lease. */
+ private final @Nullable String leaseholder;
+
+ /** A node ID that holds a lease, {@code null} if nothing holds the lease.
*/
+ private final @Nullable String leaseholderId;
/** Lease start timestamp. The timestamp is assigned when the lease
created and is not changed when the lease is prolonged. */
private final HybridTimestamp startTime;
@@ -40,36 +46,80 @@ public class TestReplicaMetaImpl implements ReplicaMeta {
/**
* Creates a new primary meta with unbounded period.
*
- * @param leaseholder Lease holder.
+ * <p>Notes: Delegates creation to a {@link
TestReplicaMetaImpl#TestReplicaMetaImpl(String, String, HybridTimestamp,
HybridTimestamp)},
+ * where {@code leaseholder} is {@link ClusterNode#name()} and {@code
leaseholderId} is {@link ClusterNode#id()}.</p>
+ *
+ * @param leaseholder Lease holder, {@code null} if nothing holds the
lease.
*/
- public TestReplicaMetaImpl(String leaseholder) {
- this.leaseholder = leaseholder;
- this.startTime = MIN_VALUE;
- this.expirationTime = MAX_VALUE;
+ TestReplicaMetaImpl(@Nullable ClusterNode leaseholder) {
+ this(leaseholder, MIN_VALUE, MAX_VALUE);
+ }
+
+ /**
+ * Creates a new primary meta with unbounded period.
+ *
+ * @param leaseholder Lease holder consistent ID, {@code null} if nothing
holds the lease.
+ * @param leaseholderId Lease holder ID, {@code null} if nothing holds the
lease.
+ */
+ TestReplicaMetaImpl(@Nullable String leaseholder, @Nullable String
leaseholderId) {
+ this(leaseholder, leaseholderId, MIN_VALUE, MAX_VALUE);
}
/**
* Creates a new primary meta.
*
- * @param leaseholder Lease holder.
+ * <p>Notes: Delegates creation to a {@link
TestReplicaMetaImpl#TestReplicaMetaImpl(String, String, HybridTimestamp,
HybridTimestamp)},
+ * where {@code leaseholder} is {@link ClusterNode#name()} and {@code
leaseholderId} is {@link ClusterNode#id()}.</p>
+ *
+ * @param leaseholder Lease holder, {@code null} if nothing holds the
lease.
* @param startTime Start lease timestamp.
- * @param leaseExpirationTime Lease expiration timestamp.
+ * @param expirationTime Lease expiration timestamp.
*/
- public TestReplicaMetaImpl(
- String leaseholder,
+ public TestReplicaMetaImpl(@Nullable ClusterNode leaseholder,
HybridTimestamp startTime, HybridTimestamp expirationTime) {
+ this(
+ leaseholder == null ? null : leaseholder.name(),
+ leaseholder == null ? null : leaseholder.id(),
+ startTime,
+ expirationTime
+ );
+ }
+
+ /**
+ * Creates a new primary meta.
+ *
+ * @param leaseholder Lease holder consistent ID, {@code null} if nothing
holds the lease.
+ * @param leaseholderId Lease holder ID, {@code null} if nothing holds the
lease.
+ * @param startTime Start lease timestamp.
+ * @param expirationTime Lease expiration timestamp.
+ */
+ private TestReplicaMetaImpl(
+ @Nullable String leaseholder,
+ @Nullable String leaseholderId,
HybridTimestamp startTime,
- HybridTimestamp leaseExpirationTime
+ HybridTimestamp expirationTime
) {
+ assertEquals(
+ leaseholder == null,
+ leaseholderId == null,
+ String.format("leaseholder=%s, leaseholderId=%s", leaseholder,
leaseholderId)
+ );
+
this.leaseholder = leaseholder;
+ this.leaseholderId = leaseholderId;
this.startTime = startTime;
- this.expirationTime = leaseExpirationTime;
+ this.expirationTime = expirationTime;
}
@Override
- public String getLeaseholder() {
+ public @Nullable String getLeaseholder() {
return leaseholder;
}
+ @Override
+ public @Nullable String getLeaseholderId() {
+ return leaseholderId;
+ }
+
@Override
public HybridTimestamp getStartTime() {
return startTime;
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 6c8ee66c29..bdcbb41dfe 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -349,7 +349,7 @@ public class LeaseUpdater {
if (isLeaseOutdated(lease)) {
// New lease is granting.
writeNewLease(grpId, lease, candidate, renewedLeases,
toBeNegotiated);
- } else if (lease.isProlongable() &&
candidate.name().equals(lease.getLeaseholder())) {
+ } else if (lease.isProlongable() &&
candidate.id().equals(lease.getLeaseholderId())) {
// Old lease is renewing.
prolongLease(grpId, lease, renewedLeases);
}
@@ -391,7 +391,7 @@ public class LeaseUpdater {
var expirationTs = new HybridTimestamp(startTs.getPhysical() +
longLeaseInterval, 0);
- Lease renewedLease = new Lease(candidate.name(), startTs,
expirationTs, grpId);
+ Lease renewedLease = new Lease(candidate.name(), candidate.id(),
startTs, expirationTs, grpId);
renewedLeases.put(grpId, renewedLease);
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
index b8bef14056..2a5ca15ca8 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -17,30 +17,36 @@
package org.apache.ignite.internal.placementdriver.leases;
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.ByteUtils.stringFromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ByteUtils;
+import org.jetbrains.annotations.Nullable;
/**
* A lease representation in memory.
* The real lease is stored in Meta storage.
*/
public class Lease implements ReplicaMeta {
- /** The object is used when nothing holds the lease. Empty lease is always
expired. */
- public static Lease EMPTY_LEASE = new Lease(null, MIN_VALUE, MIN_VALUE,
null);
+ private static final long serialVersionUID = 394641185393949608L;
- /** A node that holds a lease. */
- private final String leaseholder;
+ /** Node consistent ID (assigned to a node once), {@code null} if nothing
holds the lease. */
+ private final @Nullable String leaseholder;
+
+ /** Leaseholder node ID (changes on every node startup), {@code null} if
nothing holds the lease. */
+ private final @Nullable String leaseholderId;
/** The lease is accepted, when the holder knows about it and applies all
related obligations. */
private final boolean accepted;
@@ -54,44 +60,52 @@ public class Lease implements ReplicaMeta {
/** The lease is available to prolong in the same leaseholder. */
private final boolean prolongable;
- /** Id of replication group. */
+ /** ID of replication group. */
private final ReplicationGroupId replicationGroupId;
/**
* Creates a new lease.
*
- * @param leaseholder Lease holder.
+ * @param leaseholder Leaseholder node consistent ID (assigned to a node
once), {@code null} if nothing holds the lease.
+ * @param leaseholderId Leaseholder node ID (changes on every node
startup), {@code null} if nothing holds the lease.
* @param startTime Start lease timestamp.
* @param leaseExpirationTime Lease expiration timestamp.
- * @param replicationGroupId Id of replication group.
+ * @param replicationGroupId ID of replication group.
*/
public Lease(
- String leaseholder,
+ @Nullable String leaseholder,
+ @Nullable String leaseholderId,
HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime,
ReplicationGroupId replicationGroupId
) {
- this(leaseholder, startTime, leaseExpirationTime, false, false,
replicationGroupId);
+ this(leaseholder, leaseholderId, startTime, leaseExpirationTime,
false, false, replicationGroupId);
}
/**
* The constructor.
*
- * @param leaseholder Lease holder.
+ * @param leaseholder Leaseholder node consistent ID (assigned to a node
once), {@code null} if nothing holds the lease.
+ * @param leaseholderId Leaseholder node ID (changes on every node
startup), {@code null} if nothing holds the lease.
* @param startTime Start lease timestamp.
* @param leaseExpirationTime Lease expiration timestamp.
* @param prolong Lease is available to prolong.
- * @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
+ * @param accepted The flag is {@code true} when the holder accepted the
lease.
+ * @param replicationGroupId ID of replication group.
*/
public Lease(
- String leaseholder,
+ @Nullable String leaseholder,
+ @Nullable String leaseholderId,
HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime,
boolean prolong,
boolean accepted,
ReplicationGroupId replicationGroupId
) {
+ assert (leaseholder == null) == (leaseholderId == null) :
"leaseholder=" + leaseholder + ", leaseholderId=" + leaseholderId;
+
this.leaseholder = leaseholder;
+ this.leaseholderId = leaseholderId;
this.startTime = startTime;
this.expirationTime = leaseExpirationTime;
this.prolongable = prolong;
@@ -106,17 +120,10 @@ public class Lease implements ReplicaMeta {
* @return A new lease which will have the same properties except of
expiration timestamp.
*/
public Lease prolongLease(HybridTimestamp to) {
- assert accepted : "The lease should be accepted by leaseholder before
prolongation ["
- + "leaseholder=" + leaseholder
- + ", expirationTime=" + expirationTime
- + ", prolongTo=" + to + ']';
-
- assert prolongable : "The lease should be available to prolong ["
- + "leaseholder=" + leaseholder
- + ", expirationTime=" + expirationTime
- + ", prolongTo=" + to + ']';
+ assert accepted : "The lease should be accepted by leaseholder before
prolongation: [lease=" + this + ", to=" + to + ']';
+ assert prolongable : "The lease should be available to prolong:
[lease=" + this + ", to=" + to + ']';
- return new Lease(leaseholder, startTime, to, true, true,
replicationGroupId);
+ return new Lease(leaseholder, leaseholderId, startTime, to, true,
true, replicationGroupId);
}
/**
@@ -126,11 +133,9 @@ public class Lease implements ReplicaMeta {
* @return A accepted lease.
*/
public Lease acceptLease(HybridTimestamp to) {
- assert !accepted : "The lease is already accepted ["
- + "leaseholder=" + leaseholder
- + ", expirationTime=" + expirationTime + ']';
+ assert !accepted : "The lease is already accepted: " + this;
- return new Lease(leaseholder, startTime, to, true, true,
replicationGroupId);
+ return new Lease(leaseholder, leaseholderId, startTime, to, true,
true, replicationGroupId);
}
/**
@@ -139,18 +144,21 @@ public class Lease implements ReplicaMeta {
* @return Denied lease.
*/
public Lease denyLease() {
- assert accepted : "The lease is not accepted ["
- + "leaseholder=" + leaseholder
- + ", expirationTime=" + expirationTime + ']';
+ assert accepted : "The lease is not accepted: " + this;
- return new Lease(leaseholder, startTime, expirationTime, false, true,
replicationGroupId);
+ return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, false, true, replicationGroupId);
}
@Override
- public String getLeaseholder() {
+ public @Nullable String getLeaseholder() {
return leaseholder;
}
+ @Override
+ public @Nullable String getLeaseholderId() {
+ return leaseholderId;
+ }
+
@Override
public HybridTimestamp getStartTime() {
return startTime;
@@ -161,24 +169,17 @@ public class Lease implements ReplicaMeta {
return expirationTime;
}
- /**
- * Gets a prolongation flag.
- *
- * @return True if the lease might be prolonged, false otherwise.
- */
+ /** Returns {@code true} if the lease might be prolonged. */
public boolean isProlongable() {
return prolongable;
}
- /**
- * Gets accepted flag.
- *
- * @return True if the lease accepted, false otherwise.
- */
+ /** Returns {@code true} if the lease accepted. */
public boolean isAccepted() {
return accepted;
}
+ /** Returns ID of replication group. */
public ReplicationGroupId replicationGroupId() {
return replicationGroupId;
}
@@ -189,28 +190,25 @@ public class Lease implements ReplicaMeta {
* @return Lease representation in a byte array.
*/
public byte[] bytes() {
- byte[] leaseholderBytes = leaseholder == null ? null :
leaseholder.getBytes(StandardCharsets.UTF_8);
- short leaseholderBytesSize = (short) (leaseholderBytes == null ? 0 :
leaseholderBytes.length);
+ byte[] leaseholderBytes = stringToBytes(leaseholder);
+ byte[] leaseholderIdBytes = stringToBytes(leaseholderId);
+ byte[] groupIdBytes = toBytes(replicationGroupId);
- byte[] groupIdBytes = replicationGroupId == null ? null :
ByteUtils.toBytes(replicationGroupId);
- short groupIdBytesSize = (short) (groupIdBytes == null ? 0 :
groupIdBytes.length);
+ int bufSize = 2 // accepted + prolongable
+ + HYBRID_TIMESTAMP_SIZE * 2 // startTime + expirationTime
+ + bytesSizeForWrite(leaseholderBytes) +
bytesSizeForWrite(leaseholderIdBytes) + bytesSizeForWrite(groupIdBytes);
- int bufSize = leaseholderBytesSize + groupIdBytesSize + Short.BYTES *
2 + HYBRID_TIMESTAMP_SIZE * 2 + 1 + 1;
+ ByteBuffer buf = ByteBuffer.allocate(bufSize).order(LITTLE_ENDIAN);
- ByteBuffer buf =
ByteBuffer.allocate(bufSize).order(ByteOrder.LITTLE_ENDIAN);
+ putBoolean(buf, accepted);
+ putBoolean(buf, prolongable);
- buf.put((byte) (accepted ? 1 : 0));
- buf.put((byte) (prolongable ? 1 : 0));
- buf.putLong(startTime.longValue());
- buf.putLong(expirationTime.longValue());
- buf.putShort(leaseholderBytesSize);
- if (leaseholderBytes != null) {
- buf.put(leaseholderBytes);
- }
- buf.putShort(groupIdBytesSize);
- if (groupIdBytes != null) {
- buf.put(groupIdBytes);
- }
+ putHybridTimestamp(buf, startTime);
+ putHybridTimestamp(buf, expirationTime);
+
+ putBytes(buf, leaseholderBytes);
+ putBytes(buf, leaseholderIdBytes);
+ putBytes(buf, groupIdBytes);
return buf.array();
}
@@ -222,31 +220,29 @@ public class Lease implements ReplicaMeta {
* @return Decoded lease.
*/
public static Lease fromBytes(ByteBuffer buf) {
- boolean accepted = buf.get() == 1;
- boolean prolongable = buf.get() == 1;
- HybridTimestamp startTime = hybridTimestamp(buf.getLong());
- HybridTimestamp expirationTime = hybridTimestamp(buf.getLong());
- short leaseholderBytesSize = buf.getShort();
- String leaseholder;
- if (leaseholderBytesSize > 0) {
- byte[] leaseholderBytes = new byte[leaseholderBytesSize];
- buf.get(leaseholderBytes);
- leaseholder = new String(leaseholderBytes, StandardCharsets.UTF_8);
- } else {
- leaseholder = null;
- }
+ assert buf.order() == LITTLE_ENDIAN;
- short groupIdBytesSize = buf.getShort();
- ReplicationGroupId groupId;
- if (groupIdBytesSize > 0) {
- byte[] groupIdBytes = new byte[groupIdBytesSize];
- buf.get(groupIdBytes);
- groupId = ByteUtils.fromBytes(groupIdBytes);
- } else {
- groupId = null;
- }
+ boolean accepted = getBoolean(buf);
+ boolean prolongable = getBoolean(buf);
+
+ HybridTimestamp startTime = getHybridTimestamp(buf);
+ HybridTimestamp expirationTime = getHybridTimestamp(buf);
- return new Lease(leaseholder, startTime, expirationTime, prolongable,
accepted, groupId);
+ String leaseholder = stringFromBytes(getBytes(buf));
+ String leaseholderId = stringFromBytes(getBytes(buf));
+
+ ReplicationGroupId groupId = ByteUtils.fromBytes(getBytes(buf));
+
+ return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, prolongable, accepted, groupId);
+ }
+
+ /**
+ * Returns a lease that no one holds and is always expired.
+ *
+ * @param replicationGroupId Replication group ID.
+ */
+ public static Lease emptyLease(ReplicationGroupId replicationGroupId) {
+ return new Lease(null, null, MIN_VALUE, MIN_VALUE, replicationGroupId);
}
@Override
@@ -262,14 +258,59 @@ public class Lease implements ReplicaMeta {
if (o == null || getClass() != o.getClass()) {
return false;
}
- Lease lease = (Lease) o;
- return accepted == lease.accepted && prolongable == lease.prolongable
&& Objects.equals(leaseholder, lease.leaseholder)
- && Objects.equals(startTime, lease.startTime) &&
Objects.equals(expirationTime, lease.expirationTime)
- && Objects.equals(replicationGroupId,
lease.replicationGroupId);
+ Lease other = (Lease) o;
+ return accepted == other.accepted && prolongable == other.prolongable
+ && Objects.equals(leaseholder, other.leaseholder) &&
Objects.equals(leaseholderId, other.leaseholderId)
+ && Objects.equals(startTime, other.startTime) &&
Objects.equals(expirationTime, other.expirationTime)
+ && Objects.equals(replicationGroupId,
other.replicationGroupId);
}
@Override
public int hashCode() {
- return Objects.hash(leaseholder, accepted, startTime, expirationTime,
prolongable, replicationGroupId);
+ return Objects.hash(leaseholder, leaseholderId, accepted, startTime,
expirationTime, prolongable, replicationGroupId);
+ }
+
+ private static int bytesSizeForWrite(byte @Nullable [] bytes) {
+ return Integer.BYTES + (bytes == null ? 0 : bytes.length);
+ }
+
+ private static void putBoolean(ByteBuffer buffer, boolean b) {
+ buffer.put((byte) (b ? 1 : 0));
+ }
+
+ private static boolean getBoolean(ByteBuffer buffer) {
+ return buffer.get() == 1;
+ }
+
+ private static void putHybridTimestamp(ByteBuffer buffer, HybridTimestamp
hybridTimestamp) {
+ buffer.putLong(hybridTimestamp.longValue());
+ }
+
+ private static HybridTimestamp getHybridTimestamp(ByteBuffer buffer) {
+ return hybridTimestamp(buffer.getLong());
+ }
+
+ private static void putBytes(ByteBuffer buffer, byte @Nullable [] bytes) {
+ buffer.putInt(bytes == null ? -1 : bytes.length);
+
+ if (bytes != null) {
+ buffer.put(bytes);
+ }
+ }
+
+ private static byte @Nullable [] getBytes(ByteBuffer buffer) {
+ int bytesLen = buffer.getInt();
+
+ if (bytesLen < 0) {
+ return null;
+ } else if (bytesLen == 0) {
+ return BYTE_EMPTY_ARRAY;
+ }
+
+ byte[] bytes = new byte[bytesLen];
+
+ buffer.get(bytes);
+
+ return bytes;
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 0e045e2f58..a08dc28126 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -27,7 +27,7 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED;
import static
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED;
-import static
org.apache.ignite.internal.placementdriver.leases.Lease.EMPTY_LEASE;
+import static
org.apache.ignite.internal.placementdriver.leases.Lease.emptyLease;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -145,7 +145,9 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
assert leases != null : "Leases not initialized, probably the local
placement driver actor hasn't started lease tracking.";
- return leases.leaseByGroupId().getOrDefault(grpId, EMPTY_LEASE);
+ Lease lease = leases.leaseByGroupId().get(grpId);
+
+ return lease == null ? emptyLease(grpId) : lease;
}
/** Returns collection of leases, ordered by replication group. */
@@ -230,7 +232,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
@Override
public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
return inBusyLockAsync(busyLock, () -> {
- Lease lease =
leases.leaseByGroupId().getOrDefault(replicationGroupId, EMPTY_LEASE);
+ Lease lease = getLease(replicationGroupId);
if (lease.isAccepted() &&
lease.getExpirationTime().after(timestamp)) {
return completedFuture(lease);
@@ -240,7 +242,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
.clusterTime()
.waitFor(timestamp.addPhysicalTime(CLOCK_SKEW))
.thenApply(ignored -> inBusyLock(busyLock, () -> {
- Lease lease0 =
leases.leaseByGroupId().getOrDefault(replicationGroupId, EMPTY_LEASE);
+ Lease lease0 = getLease(replicationGroupId);
if (lease0.isAccepted() &&
lease0.getExpirationTime().after(timestamp)) {
return lease0;
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
index 7974b386f9..ca303ef5f4 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
@@ -77,9 +77,16 @@ public class LeaseTrackerTest extends BaseIgniteAbstractTest
{
});
TablePartitionId partId0 = new TablePartitionId(0, 0);
- Lease lease0 = new Lease("notAccepted", new HybridTimestamp(1, 0), new
HybridTimestamp(1000, 0), partId0);
TablePartitionId partId1 = new TablePartitionId(0, 1);
- Lease lease1 = new Lease("accepted", new HybridTimestamp(1, 0), new
HybridTimestamp(1000, 0), partId1)
+
+ HybridTimestamp startTime = new HybridTimestamp(1, 0);
+ HybridTimestamp expirationTime = new HybridTimestamp(1000, 0);
+
+ String leaseholder0 = "notAccepted";
+ String leaseholder1 = "accepted";
+
+ Lease lease0 = new Lease(leaseholder0, leaseholder0 + "_id",
startTime, expirationTime, partId0);
+ Lease lease1 = new Lease(leaseholder1, leaseholder1 + "_id",
startTime, expirationTime, partId1)
.acceptLease(new HybridTimestamp(2000, 0));
// In entry0, there are leases for partition ids partId0 and partId1.
In entry1, there is only partId0, so partId1 is expired.
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index dd1a923bd9..571ffee74d 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -114,7 +114,7 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
when(mcEntriesCursor.iterator()).thenReturn(List.of(entry).iterator());
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
lenient().when(leaseTracker.leasesCurrent()).thenReturn(leases);
-
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenReturn(Lease.EMPTY_LEASE);
+
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i ->
Lease.emptyLease(i.getArgument(0)));
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
when(metaStorageManager.getLocally(any(ByteArray.class),
any(ByteArray.class), anyLong())).thenReturn(mcEntriesCursor);
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, List.of(node))));
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 52920e6274..855ad282be 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -74,8 +74,12 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
private static final TablePartitionId GROUP_1 = new TablePartitionId(1000,
0);
private static final String LEASEHOLDER_1 = "leaseholder1";
+
+ private static final String LEASEHOLDER_ID_1 = "leaseholder1_id";
+
private static final Lease LEASE_FROM_1_TO_5_000 = new Lease(
LEASEHOLDER_1,
+ LEASEHOLDER_ID_1,
new HybridTimestamp(1, 0),
new HybridTimestamp(5_000, 0),
false,
@@ -85,6 +89,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
private static final Lease LEASE_FROM_1_TO_15_000 = new Lease(
LEASEHOLDER_1,
+ LEASEHOLDER_ID_1,
new HybridTimestamp(1, 0),
new HybridTimestamp(15_000, 0),
false,
@@ -94,6 +99,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
private static final Lease LEASE_FROM_15_000_TO_30_000 = new Lease(
LEASEHOLDER_1,
+ LEASEHOLDER_ID_1,
new HybridTimestamp(15_000, 0),
new HybridTimestamp(30_000, 0),
false,
@@ -468,6 +474,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
Lease neighborGroupLease = new Lease(
LEASEHOLDER_1,
+ LEASEHOLDER_ID_1,
new HybridTimestamp(1, 0),
new HybridTimestamp(15_000, 0),
false,
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
index 9c272353b0..c2df7905cf 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
@@ -17,61 +17,82 @@
package org.apache.ignite.internal.placementdriver.leases;
-import static
org.apache.ignite.internal.placementdriver.leases.Lease.fromBytes;
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.ArrayList;
-import java.util.List;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
-/**
- * Tests for lease encoding and decoding from byte arrays.
- */
+/** Tests for lease encoding and decoding from byte arrays. */
public class LeaseSerializationTest {
@Test
- public void test() {
- Lease lease;
-
+ public void testLeaseSerialization() {
long now = System.currentTimeMillis();
ReplicationGroupId groupId = new TablePartitionId(1, 1);
- lease = Lease.EMPTY_LEASE;
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(Lease.emptyLease(groupId));
- lease = new Lease("node1", new HybridTimestamp(now, 1), new
HybridTimestamp(now + 1_000_000, 100), true, true, groupId);
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, true, groupId));
- lease = new Lease("node1", new HybridTimestamp(now, 1), new
HybridTimestamp(now + 1_000_000, 100), false, false, groupId);
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, false, groupId));
- lease = new Lease("node1", new HybridTimestamp(now, 1), new
HybridTimestamp(now + 1_000_000, 100), false, true, groupId);
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, true, groupId));
- lease = new Lease("node1", new HybridTimestamp(now, 1), new
HybridTimestamp(now + 1_000_000, 100), true, false, groupId);
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, false, groupId));
- lease = new Lease(null, new HybridTimestamp(1, 1), new
HybridTimestamp(2 + 1_000_000, 100), true, true, groupId);
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 +
1_000_000, 100), true, true, groupId));
- lease = new Lease("node" + new String(new byte[1000]), new
HybridTimestamp(1, 1), new HybridTimestamp(2, 100), false, false,
- groupId);
- assertEquals(lease,
fromBytes(ByteBuffer.wrap(lease.bytes()).order(ByteOrder.LITTLE_ENDIAN)));
+ checksSerialization(newLease("node" + new String(new byte[1000]),
timestamp(1, 1), timestamp(2, 100), false, false, groupId));
}
@Test
- public void leaseBatchTest() {
- List<Lease> leases = new ArrayList<>();
+ public void testLeaseBatchSerialization() {
+ var leases = new ArrayList<Lease>();
+
ReplicationGroupId groupId = new TablePartitionId(1, 1);
for (int i = 0; i < 25; i++) {
- leases.add(new Lease("node" + i, new HybridTimestamp(1, i), new
HybridTimestamp(1, i + 1), i % 2 == 0, i % 2 == 1, groupId));
+ leases.add(newLease("node" + i, timestamp(1, i), timestamp(1, i +
1), i % 2 == 0, i % 2 == 1, groupId));
}
- assertEquals(leases, LeaseBatch.fromBytes(ByteBuffer.wrap(new
LeaseBatch(leases).bytes()).order(ByteOrder.LITTLE_ENDIAN)).leases());
+ byte[] leaseBatchBytes = new LeaseBatch(leases).bytes();
+
+ assertEquals(leases,
LeaseBatch.fromBytes(wrap(leaseBatchBytes)).leases());
+ }
+
+ private static void checksSerialization(Lease lease) {
+ assertEquals(lease, Lease.fromBytes(wrap(lease.bytes())));
+ }
+
+ private static Lease newLease(
+ @Nullable String leaseholder,
+ HybridTimestamp startTime,
+ HybridTimestamp expirationTime,
+ boolean prolong,
+ boolean accepted,
+ ReplicationGroupId replicationGroupId
+ ) {
+ return new Lease(
+ leaseholder,
+ leaseholder == null ? null : leaseholder + "_id",
+ startTime,
+ expirationTime,
+ prolong,
+ accepted,
+ replicationGroupId
+ );
+ }
+
+ private static HybridTimestamp timestamp(long physical, int logical) {
+ return new HybridTimestamp(physical, logical);
+ }
+
+ private static ByteBuffer wrap(byte[] bytes) {
+ return ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN);
}
}
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 67164642fa..72e0dae931 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,6 +41,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -67,7 +69,9 @@ import
org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import
org.apache.ignite.internal.replicator.message.TestReplicaMessagesFactory;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.topology.LogicalTopologyServiceTestImpl;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessageHandler;
@@ -85,7 +89,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(ConfigurationExtension.class)
public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest {
- public static final int BASE_PORT = 1234;
+ private static final int BASE_PORT = 1234;
private static final TestReplicationGroupId GROUP_ID = new
TestReplicationGroupId("group_1");
@@ -96,7 +100,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
@InjectConfiguration("mock {retryTimeout=2000, responseTimeout=1000}")
private RaftConfiguration raftConfiguration;
- private HybridClock clock = new HybridClockImpl();
+ private final HybridClock clock = new HybridClockImpl();
private Set<String> placementDriverNodeNames;
private Set<String> nodeNames;
@@ -107,12 +111,12 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
/** Cluster service by node name. */
private Map<String, ClusterService> clusterServices;
- private Map<String, ReplicaManager> replicaManagers = new HashMap<>();
- private Map<String, Loza> raftManagers = new HashMap<>();
- private Map<String, TopologyAwareRaftGroupServiceFactory>
raftClientFactory = new HashMap<>();
+ private final Map<String, ReplicaManager> replicaManagers = new
HashMap<>();
+ private final Map<String, Loza> raftManagers = new HashMap<>();
+ private final Map<String, TopologyAwareRaftGroupServiceFactory>
raftClientFactory = new HashMap<>();
/** List of services to have to close before the test will be completed. */
- private List<Closeable> servicesToClose = new ArrayList<>();
+ private final List<Closeable> servicesToClose = new ArrayList<>();
@BeforeEach
public void beforeTest(TestInfo testInfo) {
@@ -127,8 +131,10 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(placementDriverNodeNames));
+ Supplier<ClusterNode> primaryReplicaSupplier = () ->
first(clusterServices.values()).topologyService().localMember();
+
for (String nodeName : nodeNames) {
- var clusterService = clusterServices.get(nodeName);
+ ClusterService clusterService = clusterServices.get(nodeName);
RaftGroupEventsClientListener eventsClientListener = new
RaftGroupEventsClientListener();
@@ -157,7 +163,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
cmgManager,
clock,
Set.of(ReplicaMessageTestGroup.class),
- new TestPlacementDriver(nodeName)
+ new TestPlacementDriver(primaryReplicaSupplier)
);
replicaManagers.put(nodeName, replicaManager);
@@ -184,9 +190,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
@AfterEach
public void afterTest() throws Exception {
- for (Closeable cl : servicesToClose) {
- cl.close();
- }
+ IgniteUtils.closeAll(servicesToClose);
}
/**
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index 3a20e13748..810b701a53 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -118,7 +118,7 @@ public class PlacementDriverReplicaSideTest extends
BaseIgniteAbstractTest {
raftClient,
LOCAL_NODE,
executor,
- new TestPlacementDriver(LOCAL_NODE.name())
+ new TestPlacementDriver(LOCAL_NODE)
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 46efff26f4..bca46ab13c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -869,7 +869,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- var placementDriver = new TestPlacementDriver(name);
+ var placementDriver = new TestPlacementDriver(() ->
clusterService.topologyService().localMember());
var clusterInitializer = new ClusterInitializer(
clusterService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 732ae9fc07..1eecdd54fc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -260,7 +260,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- var placementDriver = new TestPlacementDriver(name);
+ var placementDriver = new TestPlacementDriver(() ->
clusterSvc.topologyService().localMember());
var clusterInitializer = new ClusterInitializer(
clusterSvc,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index cd470cae7f..36829d6fe8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -73,7 +73,6 @@ import org.junit.jupiter.api.Test;
* Tests execution flow of TableScanNode.
*/
public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]> {
-
private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
// Ensures that all data from TableScanNode is being propagated correctly.
@@ -112,13 +111,15 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
ReplicaService replicaSvc = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
+ String leaseholder = "local";
+
TxManagerImpl txManager = new TxManagerImpl(
replicaSvc,
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
- () -> "local",
- new TestPlacementDriver("local")
+ () -> leaseholder,
+ new TestPlacementDriver(leaseholder, leaseholder)
);
txManager.start();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index fddf2b1e69..3db0aa2e3d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -120,7 +120,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<PartitionListener> {
private static final String NODE_NAME = "node1";
- private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new
TestPlacementDriver(NODE_NAME);
+ private static final String NODE_ID = "node1";
+
+ private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new
TestPlacementDriver(NODE_NAME, NODE_ID);
/** Factory to create RAFT command messages. */
private final TableMessagesFactory msgFactory = new TableMessagesFactory();
@@ -165,7 +167,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
private final ReplicaService replicaService = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
private final Function<String, ClusterNode> consistentIdToNode = addr
- -> new ClusterNodeImpl(NODE_NAME, NODE_NAME, new
NetworkAddress(addr, 3333));
+ -> new ClusterNodeImpl(NODE_ID, NODE_NAME, new
NetworkAddress(addr, 3333));
private final HybridClock hybridClock = new HybridClockImpl();
@@ -209,7 +211,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(i),
- () -> NODE_NAME,
+ () -> NODE_ID,
TEST_PLACEMENT_DRIVER
);
@@ -225,7 +227,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(-1),
- () -> NODE_NAME,
+ () -> NODE_ID,
TEST_PLACEMENT_DRIVER
);
@@ -480,7 +482,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(index),
- () -> NODE_NAME,
+ () -> NODE_ID,
TEST_PLACEMENT_DRIVER
);
txMgr.start();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 0d8af5898e..91973f706f 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -82,6 +82,8 @@ import org.junit.jupiter.api.TestInfo;
* Tests handling requests from {@link ReplicaService} to {@link
ReplicaManager} when the {@link Replica} is not started.
*/
public class ReplicaUnavailableTest extends IgniteAbstractTest {
+ private static final String NODE_NAME = "client";
+
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("key", NativeTypes.INT64, false)},
@@ -102,17 +104,13 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
private ClusterService clusterService;
- private NetworkAddress networkAddress;
-
- private String name = "client";
-
@BeforeEach
public void setup() {
- networkAddress = new NetworkAddress(getLocalAddress(), NODE_PORT_BASE
+ 1);
+ var networkAddress = new NetworkAddress(getLocalAddress(),
NODE_PORT_BASE + 1);
var nodeFinder = new StaticNodeFinder(List.of(networkAddress));
- clusterService = startNode(testInfo, name, NODE_PORT_BASE + 1,
nodeFinder);
+ clusterService = startNode(testInfo, NODE_NAME, NODE_PORT_BASE + 1,
nodeFinder);
replicaService = new ReplicaService(clusterService.messagingService(),
clock);
@@ -122,12 +120,12 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of()));
replicaManager = new ReplicaManager(
- name,
+ NODE_NAME,
clusterService,
cmgManager,
clock,
Set.of(TableMessageGroup.class, TxMessageGroup.class),
- new TestPlacementDriver(name)
+ new
TestPlacementDriver(clusterService.topologyService().localMember())
);
replicaManager.start();
@@ -159,7 +157,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
tablePartitionId,
completedFuture(null),
(request0, senderId) -> completedFuture(new
ReplicaResult(replicaMessageFactory.replicaResponse()
- .result(Integer.valueOf(5))
+ .result(5)
.build(), null)),
mock(TopologyAwareRaftGroupService.class),
new PendingComparableValuesTracker<>(0L)
@@ -261,7 +259,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
tablePartitionId,
new CompletableFuture<>(),
(request0, senderId) -> completedFuture(new
ReplicaResult(replicaMessageFactory.replicaResponse()
- .result(Integer.valueOf(5))
+ .result(5)
.build(), null)),
mock(TopologyAwareRaftGroupService.class),
new PendingComparableValuesTracker<>(0L)
@@ -287,7 +285,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException)
unwrapCause(e0)).code());
}
- private BinaryRow createKeyValueRow(long id, long value) {
+ private static BinaryRow createKeyValueRow(long id, long value) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA);
rowBuilder.appendLong(id);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 2b0fe309f9..eb2771148d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -150,7 +150,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
clusterNode::id,
- new TestPlacementDriver(clusterNode.name())
+ new TestPlacementDriver(clusterNode)
) {
@Override
public CompletableFuture<Void> finish(
@@ -260,7 +260,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
replicaService,
new HybridClockImpl(),
observableTimestampTracker,
- new TestPlacementDriver(clusterNode.name())
+ new TestPlacementDriver(clusterNode)
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a14b00977a..ae65c20be9 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -713,7 +713,7 @@ public class TableManagerTest extends IgniteAbstractTest {
new AlwaysSyncedSchemaSyncService(),
catalogManager,
new HybridTimestampTracker(),
- new TestPlacementDriver(NODE_NAME)
+ new TestPlacementDriver(node)
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
index 3341a10aea..9a75e90c77 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -65,6 +65,7 @@ import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
@@ -78,7 +79,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public class PartitionReplicaListenerDurableUnlockTest extends
IgniteAbstractTest {
- private static final String LOCAL_NODE_NAME = "node1";
+ private static final ClusterNode LOCAL_NODE = new ClusterNodeImpl("node1",
"node1", NetworkAddress.from("127.0.0.1:127"));
private static final int PART_ID = 0;
@@ -87,7 +88,7 @@ public class PartitionReplicaListenerDurableUnlockTest
extends IgniteAbstractTes
/** Hybrid clock. */
private final HybridClock clock = new HybridClockImpl();
- private final TestPlacementDriver placementDriver = new
TestPlacementDriver(LOCAL_NODE_NAME);
+ private final TestPlacementDriver placementDriver = new
TestPlacementDriver(LOCAL_NODE);
/** The storage stores transaction states. */
private final TestTxStateStorage txStateStorage = new TestTxStateStorage();
@@ -136,7 +137,7 @@ public class PartitionReplicaListenerDurableUnlockTest
extends IgniteAbstractTes
mock(TransactionStateResolver.class),
mock(StorageUpdateHandler.class),
mock(Schemas.class),
- new ClusterNodeImpl("node1", LOCAL_NODE_NAME,
NetworkAddress.from("127.0.0.1:127")),
+ LOCAL_NODE,
mock(SchemaSyncService.class),
mock(CatalogService.class),
placementDriver
@@ -162,7 +163,7 @@ public class PartitionReplicaListenerDurableUnlockTest
extends IgniteAbstractTes
return completedFuture(null);
};
- PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+ PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name());
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters), willSucceedIn(1, SECONDS));
@@ -192,7 +193,7 @@ public class PartitionReplicaListenerDurableUnlockTest
extends IgniteAbstractTes
return completedFuture(null);
};
- PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+ PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name());
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters), willSucceedIn(1, SECONDS));
@@ -212,7 +213,7 @@ public class PartitionReplicaListenerDurableUnlockTest
extends IgniteAbstractTes
CompletableFuture<ReplicaMeta> primaryReplicaFuture = new
CompletableFuture<>();
placementDriver.setAwaitPrimaryReplicaFunction((groupId, timestamp) ->
primaryReplicaFuture);
- PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+ PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name());
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters), willSucceedIn(1, SECONDS));
assertFalse(txStateStorage.get(tx0).locksReleased());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index f9f2d1f74a..d56eb38cf1 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -248,7 +248,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
localNode,
new AlwaysSyncedSchemaSyncService(),
catalogService,
- new TestPlacementDriver(localNode.name())
+ new TestPlacementDriver(localNode)
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5b56fc8018..462ee6ba38 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -494,7 +494,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
localNode,
schemaSyncService,
catalogService,
- new TestPlacementDriver(localNode.name())
+ new TestPlacementDriver(localNode)
);
kvMarshaller = marshallerFor(schemaDescriptor);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 6377f9a7ee..8db24cd35b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -281,7 +282,9 @@ public class ItTxTestCluster {
assertTrue(waitForTopology(node, nodes, 1000));
}
- placementDriver = new TestPlacementDriver(cluster.get(0).nodeName());
+ ClusterNode firstNode = first(cluster).topologyService().localMember();
+
+ placementDriver = new TestPlacementDriver(firstNode);
LOG.info("The cluster has been started");
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index c3d191a568..41800c140c 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -112,7 +112,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
public static final ClusterNode LOCAL_NODE = new ClusterNodeImpl("id",
"node", ADDR);
- private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new
TestPlacementDriver(LOCAL_NODE.name());
+ private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new
TestPlacementDriver(LOCAL_NODE);
// 2000 was picked to avoid negative time that we get when building read
timestamp
// in TxManagerImpl.currentReadTimestamp.
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index cd3fd04aa8..df09ac291f 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.tx.TransactionException;
@@ -78,7 +79,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public class TxManagerTest extends IgniteAbstractTest {
- private static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1",
2004);
+ private static final ClusterNode LOCAL_NODE = new
ClusterNodeImpl("local_id", "local", new NetworkAddress("127.0.0.1", 2004),
null);
/** Timestamp tracker. */
private HybridTimestampTracker hybridTimestampTracker = new
HybridTimestampTracker();
@@ -101,7 +102,7 @@ public class TxManagerTest extends IgniteAbstractTest {
public void setup() {
clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
-
when(clusterService.topologyService().localMember().address()).thenReturn(ADDR);
+
when(clusterService.topologyService().localMember().address()).thenReturn(LOCAL_NODE.address());
replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
@@ -112,7 +113,7 @@ public class TxManagerTest extends IgniteAbstractTest {
new HeapLockManager(),
clock,
new TransactionIdGenerator(0xdeadbeef),
- () -> "local",
+ LOCAL_NODE::id,
placementDriver
);
@@ -144,7 +145,7 @@ public class TxManagerTest extends IgniteAbstractTest {
public void testEnlist() {
NetworkAddress addr =
clusterService.topologyService().localMember().address();
- assertEquals(ADDR, addr);
+ assertEquals(LOCAL_NODE.address(), addr);
InternalTransaction tx = txManager.begin(hybridTimestampTracker);
@@ -341,7 +342,7 @@ public class TxManagerTest extends IgniteAbstractTest {
public void testFinishSamePrimary() {
// Same primary that was enlisted is returned during finish phase and
commitTimestamp is less that primary.expirationTimestamp.
when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(CompletableFuture.completedFuture(
- new TestReplicaMetaImpl("local", hybridTimestamp(1),
HybridTimestamp.MAX_VALUE)));
+ new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
HybridTimestamp.MAX_VALUE)));
// Ensure that commit doesn't throw exceptions.
InternalTransaction committedTransaction = prepareTransaction();
@@ -368,7 +369,7 @@ public class TxManagerTest extends IgniteAbstractTest {
// It's impossible from the point of view of getPrimaryReplica to
return expired lease,
// given test checks that an assertion exception will be thrown and
wrapped with proper transaction public one.
when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(CompletableFuture.completedFuture(
- new TestReplicaMetaImpl("local", hybridTimestamp(1),
hybridTimestamp(10))));
+ new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
hybridTimestamp(10))));
InternalTransaction committedTransaction = prepareTransaction();
Throwable throwable =
assertThrowsWithCause(committedTransaction::commit, AssertionError.class);
@@ -387,7 +388,7 @@ public class TxManagerTest extends IgniteAbstractTest {
public void testFinishExpiredWithDifferentEnlistmentConsistencyToken() {
// Primary with another enlistment consistency token is returned.
when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(CompletableFuture.completedFuture(
- new TestReplicaMetaImpl("local", hybridTimestamp(2),
HybridTimestamp.MAX_VALUE)));
+ new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2),
HybridTimestamp.MAX_VALUE)));
assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause();