This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 33582cb1b0 IGNITE-20124 Prevent double storage updates within primary
(#4049)
33582cb1b0 is described below
commit 33582cb1b038a367d6bd5ad9456d7c21ced5c3ad
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Aug 26 19:03:58 2024 +0300
IGNITE-20124 Prevent double storage updates within primary (#4049)
---
.../ignite/internal/replicator/ReplicaImpl.java | 20 ++-
.../message/PrimaryReplicaChangeCommand.java | 6 +
.../benchmark/AbstractMultiNodeBenchmark.java | 6 +-
.../ignite/internal/benchmark/InsertBenchmark.java | 8 +
.../internal/storage/MvPartitionStorage.java | 22 ++-
.../storage/ThreadAssertingMvPartitionStorage.java | 18 +-
.../storage/AbstractMvPartitionStorageTest.java | 15 +-
.../storage/impl/TestMvPartitionStorage.java | 29 ++-
.../storage/pagememory/StoragePartitionMeta.java | 67 ++++++-
.../pagememory/StoragePartitionMetaFactory.java | 2 +
.../storage/pagememory/StoragePartitionMetaIo.java | 55 +++++-
.../mv/PersistentPageMemoryMvPartitionStorage.java | 124 ++++++++++++-
.../mv/VolatilePageMemoryMvPartitionStorage.java | 32 +++-
.../StoragePartitionMetaManagerTest.java | 2 +-
.../pagememory/StoragePartitionMetaTest.java | 2 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 73 ++++++--
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 9 +
.../ReplicasSafeTimePropagationTest.java | 3 +-
.../table/distributed/StorageUpdateHandler.java | 1 -
.../internal/table/distributed/TableManager.java | 3 +-
.../distributed/raft/PartitionDataStorage.java | 22 ++-
.../table/distributed/raft/PartitionListener.java | 141 +++++++++------
.../SnapshotAwarePartitionDataStorage.java | 18 +-
.../replicator/PartitionReplicaListener.java | 196 ++++++++-------------
.../raft/PartitionCommandListenerTest.java | 65 +++++--
.../replication/PartitionReplicaListenerTest.java | 6 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 38 +++-
.../distributed/TestPartitionDataStorage.java | 18 +-
.../table/impl/DummyInternalTableImpl.java | 38 +++-
.../ignite/internal/tx/UpdateCommandResult.java | 48 +++--
30 files changed, 832 insertions(+), 255 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
index 7f46cd276e..5c7bb1f9c0 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
@@ -218,7 +218,11 @@ public class ReplicaImpl implements Replica {
// group leader are received.
return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical())
- .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue()))
+ .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(
+ msg.leaseStartTime().longValue(),
+ localNode.id(),
+ localNode.name()
+ ))
.thenCompose(v -> {
CompletableFuture<LeaseGrantedMessageResponse>
respFut =
acceptLease(msg.leaseStartTime(),
msg.leaseExpirationTime());
@@ -233,7 +237,11 @@ public class ReplicaImpl implements Replica {
} else {
if (leader.equals(localNode)) {
return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical())
- .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue()))
+ .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(
+ msg.leaseStartTime().longValue(),
+ localNode.id(),
+ localNode.name()
+ ))
.thenCompose(v ->
acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
} else {
return proposeLeaseRedirect(leader);
@@ -242,9 +250,15 @@ public class ReplicaImpl implements Replica {
}));
}
- private CompletableFuture<Void>
sendPrimaryReplicaChangeToReplicationGroup(long leaseStartTime) {
+ private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
PrimaryReplicaChangeCommand cmd =
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
.leaseStartTime(leaseStartTime)
+ .primaryReplicaNodeId(primaryReplicaNodeId)
+ .primaryReplicaNodeName(primaryReplicaNodeName)
.build();
return raftClient.run(cmd);
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
index 941bd3e2d5..9511a68e2e 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/PrimaryReplicaChangeCommand.java
@@ -30,4 +30,10 @@ import org.apache.ignite.internal.raft.WriteCommand;
public interface PrimaryReplicaChangeCommand extends WriteCommand {
/** Lease start time, hybrid timestamp as long, see {@link
HybridTimestamp#longValue()}. */
long leaseStartTime();
+
+ /** Primary replica node id. */
+ String primaryReplicaNodeId();
+
+ /** Primary replica node name. */
+ String primaryReplicaNodeName();
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index 93a011988c..23678e9954 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -86,7 +86,7 @@ public class AbstractMultiNodeBenchmark {
var queryEngine = igniteImpl.queryEngine();
var createZoneStatement = "CREATE ZONE IF NOT EXISTS " + ZONE_NAME
+ " WITH partitions=" + partitionCount()
- + ", storage_profiles ='" + DEFAULT_STORAGE_PROFILE + "'";
+ + ", replicas=" + replicaCount() + ", storage_profiles ='"
+ DEFAULT_STORAGE_PROFILE + "'";
getAllFromCursor(
await(queryEngine.queryAsync(
@@ -244,4 +244,8 @@ public class AbstractMultiNodeBenchmark {
protected int partitionCount() {
return CatalogUtils.DEFAULT_PARTITION_COUNT;
}
+
+ protected int replicaCount() {
+ return CatalogUtils.DEFAULT_REPLICA_COUNT;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index a418b45f67..edc021ea3b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -68,6 +68,9 @@ public class InsertBenchmark extends
AbstractMultiNodeBenchmark {
@Param({"1", "2", "4", "8", "16", "32"})
private int partitionCount;
+ @Param({"1", "2", "3"})
+ private int replicaCount;
+
/**
* Benchmark for SQL insert via embedded client.
*/
@@ -408,4 +411,9 @@ public class InsertBenchmark extends
AbstractMultiNodeBenchmark {
protected int partitionCount() {
return partitionCount;
}
+
+ @Override
+ protected int replicaCount() {
+ return replicaCount;
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index a7daeebd92..31c7b5a522 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -274,8 +274,14 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
* Updates the current lease start time in the storage.
*
* @param leaseStartTime Lease start time.
+ * @param primaryReplicaNodeId Primary replica node id.
+ * @param primaryReplicaNodeName Primary replica node name.
*/
- void updateLease(long leaseStartTime);
+ void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ );
/**
* Returns the start time of the known lease for this replication group.
@@ -284,6 +290,20 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
*/
long leaseStartTime();
+ /**
+ * Return the node id of the known lease for this replication group.
+ *
+ * @return Primary replica node id or null if there is no information
about lease in the storage.
+ */
+ @Nullable String primaryReplicaNodeId();
+
+ /**
+ * Return the node name of the known lease for this replication group.
+ *
+ * @return Primary replica node name or null if there is no information
about lease in the storage.
+ */
+ @Nullable String primaryReplicaNodeName();
+
/**
* Returns the <em>estimated size</em> of this partition.
*
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 9d0350d38a..50b40fd5a2 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -155,10 +155,14 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
}
@Override
- public void updateLease(long leaseStartTime) {
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
assertThreadAllowsToWrite();
- partitionStorage.updateLease(leaseStartTime);
+ partitionStorage.updateLease(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
}
@Override
@@ -166,6 +170,16 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
return partitionStorage.leaseStartTime();
}
+ @Override
+ public @Nullable String primaryReplicaNodeId() {
+ return partitionStorage.primaryReplicaNodeId();
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ return partitionStorage.primaryReplicaNodeName();
+ }
+
@Override
public long estimatedSize() {
return partitionStorage.estimatedSize();
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 252cde5c11..675aec6257 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1375,20 +1375,29 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
public void testLease() {
storage.runConsistently(locker -> {
long lst0 = 1000;
+ String nodeId0 = UUID.randomUUID().toString();
long lst1 = 2000;
+ String nodeId1 = UUID.randomUUID().toString();
- storage.updateLease(lst0);
+ storage.updateLease(lst0, nodeId0, nodeId0 + "name");
assertEquals(lst0, storage.leaseStartTime());
+ assertEquals(nodeId0, storage.primaryReplicaNodeId());
+ assertEquals(nodeId0 + "name", storage.primaryReplicaNodeName());
- storage.updateLease(lst1);
+ storage.updateLease(lst1, nodeId1, nodeId1 + "name");
assertEquals(lst1, storage.leaseStartTime());
+ assertEquals(nodeId1, storage.primaryReplicaNodeId());
+ assertEquals(nodeId1 + "name", storage.primaryReplicaNodeName());
- storage.updateLease(0);
+ String nodeIdRandom = UUID.randomUUID().toString();
+ storage.updateLease(0, nodeIdRandom, nodeIdRandom + "name");
assertEquals(lst1, storage.leaseStartTime());
+ assertEquals(nodeId1, storage.primaryReplicaNodeId());
+ assertEquals(nodeId1 + "name", storage.primaryReplicaNodeName());
return null;
});
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 1d979d2776..4c48dc67a7 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -70,7 +70,12 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
private volatile long lastAppliedTerm;
- private volatile long leaseStartTime =
HybridTimestamp.MIN_VALUE.longValue();
+ // -1 is used as an initial value, in order not to clash with {@code
ReplicaMeta.getStartTime}
+ private volatile long leaseStartTime = -1;
+
+ private volatile String primaryReplicaNodeId;
+
+ private volatile String primaryReplicaNodeName;
private volatile long estimatedSize;
@@ -633,7 +638,11 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public synchronized void updateLease(long leaseStartTime) {
+ public synchronized void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
checkStorageClosed();
if (leaseStartTime <= this.leaseStartTime) {
@@ -641,6 +650,8 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
}
@Override
@@ -650,6 +661,20 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
return leaseStartTime;
}
+ @Override
+ public @Nullable String primaryReplicaNodeId() {
+ checkStorageClosed();
+
+ return primaryReplicaNodeId;
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ checkStorageClosed();
+
+ return primaryReplicaNodeName;
+ }
+
@Override
public long estimatedSize() {
checkStorageClosed();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
index 7e4d94030d..bfeeb4e242 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMeta.java
@@ -43,6 +43,10 @@ public class StoragePartitionMeta extends PartitionMeta {
private volatile long leaseStartTime;
+ private volatile long primaryReplicaNodeIdFirstPageId;
+
+ private volatile long primaryReplicaNodeNameFirstPageId;
+
private volatile long freeListRootPageId;
private volatile long versionChainTreeRootPageId;
@@ -59,8 +63,11 @@ public class StoragePartitionMeta extends PartitionMeta {
* @param pageCount Count of pages in the partition.
* @param lastAppliedIndex Last applied index value.
* @param lastAppliedTerm Last applied term value.
- * @param lastReplicationProtocolGroupConfigFirstPageId ID of the first
page in a chain storing a blob representing.
+ * @param lastReplicationProtocolGroupConfigFirstPageId ID of the first
page in a chain storing a blob representing last replication
+ * protocol group config.
* @param leaseStartTime Lease start time.
+ * @param primaryReplicaNodeIdFirstPageId ID of the first page in a chain
storing a blob representing a primary replica node id.
+ * @param primaryReplicaNodeNameFirstPageId ID of the first page in a
chain storing a blob representing a primary replica node name.
* @param freeListRootPageId Free list root page ID.
* @param versionChainTreeRootPageId Version chain tree root page ID.
* @param indexTreeMetaPageId Index tree meta page ID.
@@ -73,6 +80,8 @@ public class StoragePartitionMeta extends PartitionMeta {
long lastAppliedTerm,
long lastReplicationProtocolGroupConfigFirstPageId,
long leaseStartTime,
+ long primaryReplicaNodeIdFirstPageId,
+ long primaryReplicaNodeNameFirstPageId,
long freeListRootPageId,
long versionChainTreeRootPageId,
long indexTreeMetaPageId,
@@ -84,6 +93,8 @@ public class StoragePartitionMeta extends PartitionMeta {
this.lastAppliedTerm = lastAppliedTerm;
this.lastReplicationProtocolGroupConfigFirstPageId =
lastReplicationProtocolGroupConfigFirstPageId;
this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeIdFirstPageId = primaryReplicaNodeIdFirstPageId;
+ this.primaryReplicaNodeNameFirstPageId =
primaryReplicaNodeNameFirstPageId;
this.freeListRootPageId = freeListRootPageId;
this.versionChainTreeRootPageId = versionChainTreeRootPageId;
this.indexTreeMetaPageId = indexTreeMetaPageId;
@@ -264,6 +275,8 @@ public class StoragePartitionMeta extends PartitionMeta {
gcQueueMetaPageId,
pageCount(),
leaseStartTime,
+ primaryReplicaNodeIdFirstPageId,
+ primaryReplicaNodeNameFirstPageId,
estimatedSize
);
}
@@ -287,10 +300,6 @@ public class StoragePartitionMeta extends PartitionMeta {
public void updateLease(@Nullable UUID checkpointId, long leaseStartTime) {
updateSnapshot(checkpointId);
- if (leaseStartTime <= this.leaseStartTime) {
- return;
- }
-
this.leaseStartTime = leaseStartTime;
}
@@ -303,6 +312,44 @@ public class StoragePartitionMeta extends PartitionMeta {
return leaseStartTime;
}
+ /**
+ * Returns ID of the first page in a chain storing a blob representing
primary replica node id.
+ */
+ public long primaryReplicaNodeIdFirstPageId() {
+ return primaryReplicaNodeIdFirstPageId;
+ }
+
+ /**
+ * Sets ID of the first page in a chain storing a blob representing
primary replica node id.
+ *
+ * @param checkpointId Checkpoint ID.
+ * @param pageId PageId.
+ */
+ public void primaryReplicaNodeIdFirstPageId(@Nullable UUID checkpointId,
long pageId) {
+ updateSnapshot(checkpointId);
+
+ this.primaryReplicaNodeIdFirstPageId = pageId;
+ }
+
+ /**
+ * Returns ID of the first page in a chain storing a blob representing
primary replica node name.
+ */
+ public long primaryReplicaNodeNameFirstPageId() {
+ return primaryReplicaNodeNameFirstPageId;
+ }
+
+ /**
+ * Sets ID of the first page in a chain storing a blob representing
primary replica node name.
+ *
+ * @param checkpointId Checkpoint ID.
+ * @param pageId PageId.
+ */
+ public void primaryReplicaNodeNameFirstPageId(@Nullable UUID checkpointId,
long pageId) {
+ updateSnapshot(checkpointId);
+
+ this.primaryReplicaNodeNameFirstPageId = pageId;
+ }
+
/**
* An immutable snapshot of the partition's meta information.
*/
@@ -327,6 +374,10 @@ public class StoragePartitionMeta extends PartitionMeta {
private final long leaseStartTime;
+ private final long primaryReplicaNodeIdFirstPageId;
+
+ private final long primaryReplicaNodeNameFirstPageId;
+
private final long estimatedSize;
private StoragePartitionMetaSnapshot(
@@ -340,6 +391,8 @@ public class StoragePartitionMeta extends PartitionMeta {
long gcQueueMetaPageId,
int pageCount,
long leaseStartTime,
+ long primaryReplicaNodeIdFistPageId,
+ long primaryReplicaNodeNameFistPageId,
long estimatedSize
) {
this.checkpointId = checkpointId;
@@ -352,6 +405,8 @@ public class StoragePartitionMeta extends PartitionMeta {
this.gcQueueMetaPageId = gcQueueMetaPageId;
this.pageCount = pageCount;
this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeIdFirstPageId =
primaryReplicaNodeIdFistPageId;
+ this.primaryReplicaNodeNameFirstPageId =
primaryReplicaNodeNameFistPageId;
this.estimatedSize = estimatedSize;
}
@@ -443,6 +498,8 @@ public class StoragePartitionMeta extends PartitionMeta {
storageMetaIo.setGcQueueMetaPageId(pageAddr, gcQueueMetaPageId);
storageMetaIo.setPageCount(pageAddr, pageCount);
storageMetaIo.setLeaseStartTime(pageAddr, leaseStartTime);
+ storageMetaIo.setPrimaryReplicaNodeIdFirstPageId(pageAddr,
primaryReplicaNodeIdFirstPageId);
+ storageMetaIo.setPrimaryReplicaNodeNameFirstPageId(pageAddr,
primaryReplicaNodeNameFirstPageId);
storageMetaIo.setEstimatedSize(pageAddr, estimatedSize);
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
index 0f791b8f7c..8f39e66043 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
@@ -36,6 +36,8 @@ public class StoragePartitionMetaFactory implements
PartitionMetaFactory {
metaIo.getLastAppliedTerm(pageAddr),
metaIo.getLastReplicationProtocolGroupConfigFirstPageId(pageAddr),
metaIo.getLeaseStartTime(pageAddr),
+ metaIo.getPrimaryReplicaNodeIdFirstPageId(pageAddr),
+ metaIo.getPrimaryReplicaNodeNameFirstPageId(pageAddr),
StoragePartitionMetaIo.getFreeListRootPageId(pageAddr),
metaIo.getVersionChainTreeRootPageId(pageAddr),
metaIo.getIndexTreeMetaPageId(pageAddr),
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
index 20e3c62bec..9b7da9c977 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
@@ -48,8 +48,12 @@ public class StoragePartitionMetaIo extends PartitionMetaIo {
private static final int LEASE_START_TIME_OFF = GC_QUEUE_META_PAGE_ID_OFF
+ Long.BYTES;
- private static final int ESTIMATED_SIZE_OFF = LEASE_START_TIME_OFF +
Long.BYTES;
+ private static final int PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF =
LEASE_START_TIME_OFF + Long.BYTES;
+ private static final int PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF =
PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF + Long.BYTES;
+
+ /** Estimated size here is not a size of a meta, but an approximate rows
count. */
+ private static final int ESTIMATED_SIZE_OFF =
PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF + Long.BYTES;
/** I/O versions. */
public static final IoVersions<StoragePartitionMetaIo> VERSIONS = new
IoVersions<>(new StoragePartitionMetaIo(1));
@@ -77,6 +81,8 @@ public class StoragePartitionMetaIo extends PartitionMetaIo {
setGcQueueMetaPageId(pageAddr, 0);
setPageCount(pageAddr, 0);
setLeaseStartTime(pageAddr, HybridTimestamp.MIN_VALUE.longValue());
+ setPrimaryReplicaNodeIdFirstPageId(pageAddr, 0);
+ setPrimaryReplicaNodeNameFirstPageId(pageAddr, 0);
setEstimatedSize(pageAddr, 0);
}
@@ -249,6 +255,51 @@ public class StoragePartitionMetaIo extends
PartitionMetaIo {
return getLong(pageAddr, LEASE_START_TIME_OFF);
}
+ /**
+ * Sets the primary replica node id first page id.
+ *
+ * @param pageAddr Page address.
+ * @param primaryReplicaNodeIdFirstPageId Primary replica node id first
page id.
+ */
+ public void setPrimaryReplicaNodeIdFirstPageId(long pageAddr, long
primaryReplicaNodeIdFirstPageId) {
+ assertPageType(pageAddr);
+
+ putLong(pageAddr, PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF,
primaryReplicaNodeIdFirstPageId);
+ }
+
+ /**
+ * Returns the primary replica node id first page id.
+ *
+ * @param pageAddr Page address.
+ * @return Primary replica node id first page id.
+ */
+ public long getPrimaryReplicaNodeIdFirstPageId(long pageAddr) {
+ return getLong(pageAddr, PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF);
+ }
+
+
+ /**
+ * Sets the primary replica node name first page id.
+ *
+ * @param pageAddr Page address.
+ * @param primaryReplicaNodeNameFirstPageId Primary replica node name
first page id.
+ */
+ public void setPrimaryReplicaNodeNameFirstPageId(long pageAddr, long
primaryReplicaNodeNameFirstPageId) {
+ assertPageType(pageAddr);
+
+ putLong(pageAddr, PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF,
primaryReplicaNodeNameFirstPageId);
+ }
+
+ /**
+ * Returns the primary replica node name first page id.
+ *
+ * @param pageAddr Page address.
+ * @return Primary replica node name first page id.
+ */
+ public long getPrimaryReplicaNodeNameFirstPageId(long pageAddr) {
+ return getLong(pageAddr, PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF);
+ }
+
/**
* Sets the estimated size of this partition.
*
@@ -282,6 +333,8 @@ public class StoragePartitionMetaIo extends PartitionMetaIo
{
.app("gcQueueMetaPageId=").appendHex(getGcQueueMetaPageId(addr)).nl()
.app("pageCount=").app(getPageCount(addr)).nl()
.app("leaseStartTime=").app(getLeaseStartTime(addr)).nl()
+
.app("primaryReplicaNodeIdFirstPageId=").app(getPrimaryReplicaNodeIdFirstPageId(addr)).nl()
+
.app("primaryReplicaNodeNameFirstPageId=").app(getPrimaryReplicaNodeNameFirstPageId(addr)).nl()
.app("estimatedSize=").app(getEstimatedSize(addr)).nl()
.app(']');
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 296eb3af35..5fc99d0baa 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
import java.util.List;
import java.util.UUID;
@@ -52,6 +53,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.apache.ignite.internal.storage.util.LocalLocker;
+import org.apache.ignite.internal.util.ByteUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -75,6 +77,19 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
/** Lock that protects group config read/write. */
private final ReadWriteLock replicationProtocolGroupConfigReadWriteLock =
new ReentrantReadWriteLock();
+ /** Lock that protects group primary replica meta read/write. */
+ private final ReadWriteLock primaryReplicaMetaReadWriteLock = new
ReentrantReadWriteLock();
+
+ /**
+ * Cached primary replica node id in order not to touch blobStorage each
time. Guarded by primaryReplicaMetaReadWriteLock.
+ */
+ private String primaryReplicaNodeId;
+
+ /**
+ * Cached primary replica node name in order not to touch blobStorage each
time. Guarded by primaryReplicaMetaReadWriteLock.
+ */
+ private String primaryReplicaNodeName;
+
/**
* Constructor.
*
@@ -303,11 +318,50 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
}
@Override
- public void updateLease(long leaseStartTime) {
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- updateMeta((lastCheckpointId, meta) ->
meta.updateLease(lastCheckpointId, leaseStartTime));
+ updateMeta((lastCheckpointId, meta) -> {
+ primaryReplicaMetaReadWriteLock.writeLock().lock();
+ try {
+ if (leaseStartTime <= meta.leaseStartTime()) {
+ return;
+ }
+
+ if (meta.primaryReplicaNodeIdFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
+ long primaryReplicaNodeIdFirstPageId =
blobStorage.addBlob(stringToBytes(primaryReplicaNodeId));
+
+ meta.primaryReplicaNodeIdFirstPageId(lastCheckpointId,
primaryReplicaNodeIdFirstPageId);
+ } else {
+
blobStorage.updateBlob(meta.primaryReplicaNodeIdFirstPageId(),
stringToBytes(primaryReplicaNodeId));
+ }
+ if (meta.primaryReplicaNodeNameFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
+ long primaryReplicaNodeNameFirstPageId =
blobStorage.addBlob(stringToBytes(primaryReplicaNodeName));
+
+
meta.primaryReplicaNodeNameFirstPageId(lastCheckpointId,
primaryReplicaNodeNameFirstPageId);
+ } else {
+
blobStorage.updateBlob(meta.primaryReplicaNodeNameFirstPageId(),
stringToBytes(primaryReplicaNodeName));
+ }
+
+ meta.updateLease(lastCheckpointId, leaseStartTime);
+
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Cannot save lease meta: [tableId={},
partitionId={}]",
+ e,
+ tableStorage.getTableId(), partitionId
+ );
+ } finally {
+ primaryReplicaMetaReadWriteLock.writeLock().unlock();
+ }
+ });
return null;
});
@@ -322,6 +376,72 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
});
}
+ // TODO https://issues.apache.org/jira/browse/IGNITE-15119 nodeId type
should be changed from String to UUID, after the fix
+ // TODO nodeID will be stored in meta directly and not the blob storage.
+ @Override
+ public @Nullable String primaryReplicaNodeId() {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+ primaryReplicaMetaReadWriteLock.readLock().lock();
+
+ try {
+ if (primaryReplicaNodeId == null) {
+ long primaryReplicaNodeIdFirstPageId =
meta.primaryReplicaNodeIdFirstPageId();
+
+ // It's possible to face BlobStorage.NO_PAGE_ID if a lease
information has not yet been recorded in storage,
+ // for example, if the lease itself has not yet been
elected.
+ if (primaryReplicaNodeIdFirstPageId !=
BlobStorage.NO_PAGE_ID) {
+ primaryReplicaNodeId =
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeIdFirstPageId));
+ }
+ }
+
+ return primaryReplicaNodeId;
+
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Failed to read primary replica node id: [tableId={},
partitionId={}]",
+ e,
+ tableStorage.getTableId(), partitionId
+ );
+ } finally {
+ primaryReplicaMetaReadWriteLock.readLock().unlock();
+ }
+
+ });
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ primaryReplicaMetaReadWriteLock.readLock().lock();
+
+ try {
+ if (primaryReplicaNodeName == null) {
+ long primaryReplicaNodeNameFirstPageId =
meta.primaryReplicaNodeNameFirstPageId();
+
+ // It's possible to face BlobStorage.NO_PAGE_ID if a lease
information has not yet been recorded in storage,
+ // for example, if the lease itself has not yet been
elected.
+ if (primaryReplicaNodeNameFirstPageId !=
BlobStorage.NO_PAGE_ID) {
+ primaryReplicaNodeName =
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeNameFirstPageId));
+ }
+ }
+
+ return primaryReplicaNodeName;
+
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Failed to read primary replica node name:
[tableId={}, partitionId={}]",
+ e,
+ tableStorage.getTableId(), partitionId
+ );
+ } finally {
+ primaryReplicaMetaReadWriteLock.readLock().unlock();
+ }
+ });
+ }
+
private void committedGroupConfigurationBusy(byte[] groupConfigBytes) {
updateMeta((lastCheckpointId, meta) -> {
replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index d9330b1b90..7a50600651 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -68,6 +68,12 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
/** Lease start time. */
private volatile long leaseStartTime;
+ /** Primary replica node id. */
+ private volatile String primaryReplicaNodeId;
+
+ /** Primary replica node name. */
+ private volatile String primaryReplicaNodeName;
+
/** Last group configuration. */
private volatile byte @Nullable [] groupConfig;
@@ -197,7 +203,11 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
}
@Override
- public void updateLease(long leaseStartTime) {
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
busy(() -> {
throwExceptionIfStorageNotInRunnableState();
@@ -206,6 +216,8 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
}
this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
return null;
});
@@ -220,6 +232,24 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
});
}
+ @Override
+ public @Nullable String primaryReplicaNodeId() {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ return primaryReplicaNodeId;
+ });
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ return primaryReplicaNodeName;
+ });
+ }
+
@Override
public void lastAppliedOnRebalance(long lastAppliedIndex, long
lastAppliedTerm) {
throwExceptionIfStorageNotInProgressOfRebalance(state.get(),
this::createStorageInfo);
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
index 5a731c0f6f..678dd723e7 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaManagerTest.java
@@ -129,7 +129,7 @@ public class StoragePartitionMetaManagerTest extends
BaseIgniteAbstractTest {
try (FilePageStore filePageStore =
createFilePageStore(testFilePath)) {
manager.writeMetaToBuffer(
partId,
- new StoragePartitionMeta(4, 100, 10, 34, 1000, 900,
300, 200, 400, 200)
+ new StoragePartitionMeta(4, 100, 10, 34, 1000, 11, 12,
900, 300, 200, 400, 200)
.init(null)
.metaSnapshot(null),
buffer.rewind()
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
index 62195c21a8..01ca7ce261 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaTest.java
@@ -213,7 +213,7 @@ public class StoragePartitionMetaTest {
}
private static StoragePartitionMeta createMeta() {
- return new StoragePartitionMeta(0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ return new StoragePartitionMeta(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
.init(null);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 15e1a33661..773ecc99c4 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -48,12 +48,17 @@ import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.transitionToTerminalState;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToInt;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.putLongToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
@@ -198,6 +203,12 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** On-heap-cached lease start time value. */
private volatile long leaseStartTime;
+ /** On-heap-cached lease node id. */
+ private volatile String primaryReplicaNodeId;
+
+ /** On-heap-cached lease node name. */
+ private volatile String primaryReplicaNodeName;
+
/** On-heap-cached last committed group configuration. */
private volatile byte @Nullable [] lastGroupConfig;
@@ -241,12 +252,29 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
lastGroupConfig = db.get(meta, readOpts, lastGroupConfigKey);
- byte[] leaseStartTimeBytes = db.get(meta, readOpts, leaseKey);
- ByteBuffer leaseStartTimeBuf = leaseStartTimeBytes == null
- ? null
- :
ByteBuffer.wrap(leaseStartTimeBytes).order(ByteOrder.LITTLE_ENDIAN);
+ byte[] leaseBytes = db.get(meta, readOpts, leaseKey);
- leaseStartTime = leaseStartTimeBuf == null ?
HybridTimestamp.MIN_VALUE.longValue() : leaseStartTimeBuf.getLong();
+ if (leaseBytes == null) {
+ leaseStartTime = HybridTimestamp.MIN_VALUE.longValue();
+ } else {
+ leaseStartTime = bytesToLong(leaseBytes);
+
+ int primaryReplicaNodeIdLength = bytesToInt(leaseBytes,
Long.BYTES);
+ primaryReplicaNodeId = new String(
+ leaseBytes,
+ Long.BYTES + Integer.BYTES,
+ primaryReplicaNodeIdLength,
+ StandardCharsets.UTF_8
+ );
+
+ int primaryReplicaNodeNameLength = bytesToInt(leaseBytes,
Long.BYTES + Integer.BYTES + primaryReplicaNodeIdLength);
+ primaryReplicaNodeName = new String(
+ leaseBytes,
+ Long.BYTES + Integer.BYTES +
primaryReplicaNodeIdLength + Integer.BYTES,
+ primaryReplicaNodeNameLength,
+ StandardCharsets.UTF_8
+ );
+ }
byte[] estimatedSizeBytes = db.get(meta, readOpts,
estimatedSizeKey);
@@ -1070,7 +1098,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public void updateLease(long leaseStartTime) {
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
busy(() -> {
if (leaseStartTime <= this.leaseStartTime) {
return null;
@@ -1079,14 +1111,23 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
AbstractWriteBatch writeBatch = requireWriteBatch();
try {
- byte[] leaseBytes = new byte[Long.BYTES];
+ ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
+ outputStream.write(longToBytes(leaseStartTime));
+
+ byte[] primaryReplicaNodeIdBytes =
stringToBytes(primaryReplicaNodeId);
+
outputStream.write(intToBytes(primaryReplicaNodeIdBytes.length));
+ outputStream.write(primaryReplicaNodeIdBytes);
- putLongToBytes(leaseStartTime, leaseBytes, 0);
+ byte[] primaryReplicaNodeNameBytes =
stringToBytes(primaryReplicaNodeName);
+
outputStream.write(intToBytes(primaryReplicaNodeNameBytes.length));
+ outputStream.write(primaryReplicaNodeNameBytes);
- writeBatch.put(meta, leaseKey, leaseBytes);
+ writeBatch.put(meta, leaseKey, outputStream.toByteArray());
this.leaseStartTime = leaseStartTime;
- } catch (RocksDBException e) {
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ } catch (RocksDBException | IOException e) {
throw new StorageException(e);
}
@@ -1099,6 +1140,16 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return busy(() -> leaseStartTime);
}
+ @Override
+ public @Nullable String primaryReplicaNodeId() {
+ return busy(() -> primaryReplicaNodeId);
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ return busy(() -> primaryReplicaNodeName);
+ }
+
/**
* Deletes partition data from the storage, using write batch to perform
the operation.
*/
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 80f0e91efd..26ba21b514 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
@@ -137,10 +138,14 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
MvPartitionStorage partitionStorage0 =
getOrCreateMvPartition(PARTITION_ID);
RowId rowId0 = new RowId(PARTITION_ID);
+ long leaseStartTime = 1234567;
+ String primaryReplicaNodeId = UUID.randomUUID().toString();
+ String primaryReplicaNodeName = primaryReplicaNodeId + "name";
partitionStorage0.runConsistently(locker -> {
locker.lock(rowId0);
+ partitionStorage0.updateLease(leaseStartTime,
primaryReplicaNodeId, primaryReplicaNodeName);
return partitionStorage0.addWrite(rowId0, testData, txId,
COMMIT_TABLE_ID, 0);
});
@@ -156,6 +161,10 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
assertThat(tableStorage.getMvPartition(PARTITION_ID_1),
is(nullValue()));
assertThat(unwrap(tableStorage.getMvPartition(PARTITION_ID).read(rowId0,
HybridTimestamp.MAX_VALUE).binaryRow()),
is(equalTo(unwrap(testData))));
+
+ assertEquals(leaseStartTime,
tableStorage.getMvPartition(PARTITION_ID).leaseStartTime());
+ assertEquals(primaryReplicaNodeId,
tableStorage.getMvPartition(PARTITION_ID).primaryReplicaNodeId());
+ assertEquals(primaryReplicaNodeName,
tableStorage.getMvPartition(PARTITION_ID).primaryReplicaNodeName());
}
@Test
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index cd59061975..ed8f7455db 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -302,7 +302,8 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
mock(CatalogService.class),
mock(SchemaRegistry.class),
clockService,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+
clusterService.topologyService().localMember().id()
),
RaftGroupEventsListener.noopLsnr,
RaftGroupOptions.defaults()
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 0be8a177ca..3bef883622 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -486,7 +486,6 @@ public class StorageUpdateHandler {
ReadResult item = cursor.next();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20124
Prevent double storage updates within primary
if (item.isWriteIntent()) {
// We are aborting only those write intents that belong to
the provided transaction.
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20347 to check transaction id in
the storage
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 166995dd32..a1872c6482 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1099,7 +1099,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
catalogService,
table.schemaView(),
clockService,
- indexMetaStorage
+ indexMetaStorage,
+ topologyService.localMember().id()
);
SnapshotStorageFactory snapshotStorageFactory =
createSnapshotStorageFactory(replicaGrpId,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index d618e8911e..8f2fdc31f0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -232,8 +232,14 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
* Updates the current lease start time in the storage.
*
* @param leaseStartTime Lease start time.
+ * @param primaryReplicaNodeId Primary replica node id.
+ * @param primaryReplicaNodeName Primary replica node name.
*/
- void updateLease(long leaseStartTime);
+ void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ );
/**
* Return the start time of the known lease for this replication group.
@@ -241,4 +247,18 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
* @return Lease start time.
*/
long leaseStartTime();
+
+ /**
+ * Return the node id of the known lease for this replication group.
+ *
+ * @return Primary replica node id.
+ */
+ String primaryReplicaNodeId();
+
+ /**
+ * Return the node name of the known lease for this replication group.
+ *
+ * @return Primary replica node name.
+ */
+ String primaryReplicaNodeName();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 11fd10dc3e..91fa878221 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -32,8 +32,10 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -131,6 +133,10 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
private final IndexMetaStorage indexMetaStorage;
+ private final String localNodeId;
+
+ private Set<String> currentGroupTopology;
+
/**
* Timestamp with minimum starting time among all active RW transactions
in the cluster.
* This timestamp is used to prevent the catalog from being dropped, which
may be used when applying raft commands.
@@ -148,7 +154,8 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
CatalogService catalogService,
SchemaRegistry schemaRegistry,
ClockService clockService,
- IndexMetaStorage indexMetaStorage
+ IndexMetaStorage indexMetaStorage,
+ String localNodeId
) {
this.txManager = txManager;
this.storage = partitionDataStorage;
@@ -160,6 +167,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
this.schemaRegistry = schemaRegistry;
this.clockService = clockService;
this.indexMetaStorage = indexMetaStorage;
+ this.localNodeId = localNodeId;
}
@Override
@@ -261,9 +269,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
assert safeTimePropagatingCommand.safeTime() != null;
- synchronized (safeTime) {
- updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime());
- }
+ updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime());
}
updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
commandIndex);
@@ -280,7 +286,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long
commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
- return new UpdateCommandResult(true);
+ return new UpdateCommandResult(true, isPrimaryInGroupTopology());
}
if (cmd.leaseStartTime() != null) {
@@ -289,38 +295,42 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
long storageLeaseStartTime = storage.leaseStartTime();
if (leaseStartTime != storageLeaseStartTime) {
- return new UpdateCommandResult(false, storageLeaseStartTime);
+ return new UpdateCommandResult(
+ false,
+ storageLeaseStartTime,
+ isPrimaryInGroupTopology()
+ );
}
}
UUID txId = cmd.txId();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
- synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdate(
- txId,
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- !cmd.full(),
- () -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? cmd.safeTime() : null,
- cmd.lastCommitTimestamp(),
- indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
- );
-
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- } else {
- // We MUST bump information about last updated index+term.
- // See a comment in #onWrite() for explanation.
- advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
- }
+ assert storage.primaryReplicaNodeId() != null;
+ assert localNodeId != null;
+
+ if (cmd.full() || !localNodeId.equals(storage.primaryReplicaNodeId()))
{
+ storageUpdateHandler.handleUpdate(
+ txId,
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ !cmd.full(),
+ () -> storage.lastApplied(commandIndex, commandTerm),
+ cmd.full() ? cmd.safeTime() : null,
+ cmd.lastCommitTimestamp(),
+ indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
+ );
+ } else {
+ // We MUST bump information about last updated index+term.
+ // See a comment in #onWrite() for explanation.
+ // If we get here, that means that we are collocated with primary
and data was already inserted there, thus it's only required
+ // to update information about index and term.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime()
: null, cmd.full());
- return new UpdateCommandResult(true);
+ return new UpdateCommandResult(true, isPrimaryInGroupTopology());
}
/**
@@ -333,7 +343,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
private UpdateCommandResult handleUpdateAllCommand(UpdateAllCommand cmd,
long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
- return new UpdateCommandResult(true);
+ return new UpdateCommandResult(true, isPrimaryInGroupTopology());
}
if (cmd.leaseStartTime() != null) {
@@ -342,36 +352,37 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
long storageLeaseStartTime = storage.leaseStartTime();
if (leaseStartTime != storageLeaseStartTime) {
- return new UpdateCommandResult(false, storageLeaseStartTime);
+ return new UpdateCommandResult(
+ false,
+ storageLeaseStartTime,
+ isPrimaryInGroupTopology()
+ );
}
}
UUID txId = cmd.txId();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
- synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdateAll(
- txId,
- cmd.rowsToUpdate(),
- cmd.tablePartitionId().asTablePartitionId(),
- !cmd.full(),
- () -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? cmd.safeTime() : null,
- indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
- );
-
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- } else {
- // We MUST bump information about last updated index+term.
- // See a comment in #onWrite() for explanation.
- advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
- }
+ if (cmd.full() || !localNodeId.equals(storage.primaryReplicaNodeId()))
{
+ storageUpdateHandler.handleUpdateAll(
+ txId,
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ !cmd.full(),
+ () -> storage.lastApplied(commandIndex, commandTerm),
+ cmd.full() ? cmd.safeTime() : null,
+ indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
+ );
+ } else {
+ // We MUST bump information about last updated index+term.
+ // See a comment in #onWrite() for explanation.
+ // If we get here, that means that we are collocated with primary
and data was already inserted there, thus it's only required
+ // to update information about index and term.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime()
: null, cmd.full());
- return new UpdateCommandResult(true);
+ return new UpdateCommandResult(true, isPrimaryInGroupTopology());
}
/**
@@ -488,6 +499,9 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
@Override
public void onConfigurationCommitted(CommittedConfiguration config) {
+ currentGroupTopology = new HashSet<>(config.peers());
+ currentGroupTopology.addAll(config.learners());
+
// Skips the update because the storage has already recorded it.
if (config.index() <= storage.lastAppliedIndex()) {
return;
@@ -669,7 +683,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
}
storage.runConsistently(locker -> {
- storage.updateLease(cmd.leaseStartTime());
+ storage.updateLease(cmd.leaseStartTime(),
cmd.primaryReplicaNodeId(), cmd.primaryReplicaNodeName());
storage.lastApplied(commandIndex, commandTerm);
@@ -798,4 +812,29 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
return upgradedBinaryRow == sourceBinaryRow ? source : new
BinaryRowAndRowId(upgradedBinaryRow, source.rowId());
}
+
+ /**
+ * Checks whether the primary replica belongs to the raft group topology
(peers and learners) within a raft linearized context.
+ * On the primary replica election prior to the lease publication, the
placement driver sends a PrimaryReplicaChangeCommand that
+ * populates the raft listener and the underneath storage with
lease-related information, such as primaryReplicaNodeId,
+ * primaryReplicaNodeName and leaseStartTime. In Update(All)Command
handling, which occurs strictly after PrimaryReplicaChangeCommand
+ * processing, given information is used in order to detect whether
primary belongs to the raft group topology (peers and learners).
+ *
+ *
+ * @return {@code true} if primary replica belongs to the raft group
topology: peers and learners, (@code false) otherwise.
+ */
+ private boolean isPrimaryInGroupTopology() {
+ assert currentGroupTopology != null : "Current group topology is null";
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23030 Seems that
we have a bug. Lease related information is not restored on
+ // TODO snapshot load.
+ if (storage.primaryReplicaNodeName() == null) {
+ return true;
+ } else {
+ // Despite the fact that storage.primaryReplicaNodeName() may
itself return null it's never expected to happen
+ // while calling isPrimaryInGroupTopology because of HB between
handlePrimaryReplicaChangeCommand that will populate the storage
+ // with lease information and handleUpdate(All)Command that on
it's turn calls isPrimaryReplicaInGroupTopology.
+ assert storage.primaryReplicaNodeName() != null : "Primary replica
node name is null.";
+ return
currentGroupTopology.contains(storage.primaryReplicaNodeName());
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 8d122e7015..9483e5b5b3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -227,12 +227,26 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
}
@Override
- public void updateLease(long leaseStartTime) {
- partitionStorage.updateLease(leaseStartTime);
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
+ partitionStorage.updateLease(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
}
@Override
public long leaseStartTime() {
return partitionStorage.leaseStartTime();
}
+
+ @Override
+ public String primaryReplicaNodeId() {
+ return partitionStorage.primaryReplicaNodeId();
+ }
+
+ @Override
+ public String primaryReplicaNodeName() {
+ return partitionStorage.primaryReplicaNodeName();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42abb134d3..69f61e7a99 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -213,7 +213,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -2672,22 +2671,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridTimestamp safeTimeForRetry = clockService.now();
- // Within primary replica it's required to update safe
time in order to prevent double storage updates in case of !1PC.
- // Otherwise, it may be possible that a newer entry will
be overwritten by an older one that came as part of the raft
- // replication flow:
- // tx1 = transactions.begin();
- // tx1.put(k1, v1) -> primary.apply(k1,v1) + asynchronous
raft replication (k1,v1)
- // tx1.put(k1, v2) -> primary.apply(k1,v2) + asynchronous
raft replication (k1,v1)
- // (k1,v1) replication overrides newer (k1, v2).
Eventually (k1,v2) replication will restore proper value.
- // However it's possible that tx1.get(k1) will see v1
instead of v2.
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Better solution requied.
Given one is correct, but fragile.
- if ((cmd instanceof UpdateCommand && !((UpdateCommand)
cmd).full())
- || (cmd instanceof UpdateAllCommand &&
!((UpdateAllCommand) cmd).full())) {
- synchronized (safeTime) {
-
updateTrackerIgnoringTrackerClosedException(safeTime, safeTimeForRetry);
- }
- }
-
SafeTimePropagatingCommand
clonedSafeTimePropagatingCommand =
(SafeTimePropagatingCommand)
safeTimePropagatingCommand.clone();
clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry);
@@ -2743,23 +2726,18 @@ public class PartitionReplicaListener implements
ReplicaListener {
);
if (!cmd.full()) {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20124
Temporary code below
- synchronized (safeTime) {
- // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- true,
- null,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
-
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- }
+ // We don't need to take the partition snapshots read lock,
see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ true,
+ null,
+ null,
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
CompletableFuture<UUID> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
.thenApply(res -> cmd.txId());
@@ -2770,37 +2748,33 @@ public class PartitionReplicaListener implements
ReplicaListener {
applyCmdWithExceptionHandling(cmd, resultFuture);
- return resultFuture.thenApply(res -> {
+ return resultFuture.thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
- if (full && updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
+ if (!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(txId,
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
}
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
- // Try to avoid double write if an entry is already
replicated.
- synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
- // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
-
cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- false,
- null,
- cmd.safeTime(),
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
- }
-
-
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
+ if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+ return
safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
+ } else {
+ if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
+ // We don't need to take the partition snapshots
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+
cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ false,
+ null,
+ cmd.safeTime(),
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
}
- }
- return null;
+ return nullCompletedFuture();
+ }
});
}
}
@@ -2876,39 +2850,29 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (!cmd.full()) {
if (skipDelayedAck) {
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
- synchronized (safeTime) {
- // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.tablePartitionId().asTablePartitionId(),
- true,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
-
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- }
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ true,
+ null,
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<>()).thenApply(res -> null);
} else {
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
- synchronized (safeTime) {
- // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.tablePartitionId().asTablePartitionId(),
- true,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
-
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- }
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ true,
+ null,
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
CompletableFuture<Object> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
.thenApply(res -> cmd.txId());
@@ -2917,34 +2881,32 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
} else {
return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<>())
- .thenApply(res -> {
+ .thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
- if (full &&
!updateCommandResult.isPrimaryReplicaMatch()) {
- throw new
PrimaryReplicaMissException(cmd.txId(), cmd.leaseStartTime(),
-
updateCommandResult.currentLeaseStartTime());
+ if (!updateCommandResult.isPrimaryReplicaMatch()) {
+ throw new PrimaryReplicaMissException(
+ cmd.txId(),
+ cmd.leaseStartTime(),
+
updateCommandResult.currentLeaseStartTime()
+ );
}
+ if
(updateCommandResult.isPrimaryInPeersAndLearners()) {
+ return
safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
+ } else {
+ // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+
cmd.tablePartitionId().asTablePartitionId(),
+ false,
+ null,
+ cmd.safeTime(),
+ indexIdsAtRwTxBeginTs(txId)
+ );
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
- // Try to avoid double write if an entry is
already replicated.
- synchronized (safeTime) {
- if
(cmd.safeTime().compareTo(safeTime.current()) > 0) {
- // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
-
cmd.tablePartitionId().asTablePartitionId(),
- false,
- null,
- cmd.safeTime(),
- indexIdsAtRwTxBeginTs(txId)
- );
-
-
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
- }
+ return null;
}
-
- return null;
});
}
}
@@ -3961,18 +3923,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
));
}
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code
below
- private static <T extends Comparable<T>> void
updateTrackerIgnoringTrackerClosedException(
- PendingComparableValuesTracker<T, Void> tracker,
- T newValue
- ) {
- try {
- tracker.update(newValue, null);
- } catch (TrackerClosedException ignored) {
- // No-op.
- }
- }
-
private static BuildIndexCommand
toBuildIndexCommand(BuildIndexReplicaRequest request, MetaIndexStatusChange
buildingChangeInfo) {
return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommand()
.indexId(request.indexId())
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 9bf90b13be..249168b7ec 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -49,6 +49,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -221,6 +222,8 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private IndexMetaStorage indexMetaStorage;
+ private ClusterService clusterService;
+
/**
* Initializes a table listener before tests.
*/
@@ -228,9 +231,11 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
public void before() {
NetworkAddress addr = new NetworkAddress("127.0.0.1", 5003);
- ClusterService clusterService = mock(ClusterService.class,
RETURNS_DEEP_STUBS);
+ clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
when(clusterService.topologyService().localMember().address()).thenReturn(addr);
+
when(clusterService.topologyService().localMember().id()).thenReturn(addr.toString());
+ when(clusterService.nodeName()).thenReturn(addr.toString());
safeTimeTracker = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
@@ -285,8 +290,30 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
catalogService,
SCHEMA_REGISTRY,
clockService,
- indexMetaStorage
+ indexMetaStorage,
+ clusterService.topologyService().localMember().id()
);
+
+ // Update(All)Command handling requires both information about raft
group topology and the primary replica,
+ // thus onConfigurationCommited and primaryReplicaChangeCommand are
called.
+ {
+ commandListener.onConfigurationCommitted(new
CommittedConfiguration(
+ raftIndex.incrementAndGet(),
+ 1,
+ List.of(clusterService.nodeName()),
+ Collections.emptyList(),
+ null,
+ null
+ ));
+
+ PrimaryReplicaChangeCommand command =
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+ .primaryReplicaNodeName("primary")
+ .primaryReplicaNodeId(UUID.randomUUID().toString())
+
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
+ .build();
+
+
commandListener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(),
1, command)).iterator());
+ }
}
/**
@@ -386,11 +413,15 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand,
commandClosureResultCaptor)
).iterator());
- verify(mvPartitionStorage,
never()).runConsistently(any(WriteClosure.class));
- verify(mvPartitionStorage, times(1)).lastApplied(anyLong(), anyLong());
+ // Two storage runConsistently runs are expected: one for
configuration application and another for primaryReplicaChangeCommand
+ // handling. Both comes from initial configuration preparation in
@BeforeEach
+ verify(mvPartitionStorage,
times(2)).runConsistently(any(WriteClosure.class));
+ verify(mvPartitionStorage, times(3)).lastApplied(anyLong(), anyLong());
- assertThat(updateCommandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new UpdateCommandResult(true),
- new UpdateCommandResult(true)));
+ assertThat(updateCommandClosureResultCaptor.getAllValues(),
+ containsInAnyOrder(new UpdateCommandResult(true, false),
+ new UpdateCommandResult(true, false))
+ );
assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null, null}));
// Checks for TxStateStorage.
@@ -473,7 +504,8 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
catalogService,
SCHEMA_REGISTRY,
clockService,
- indexMetaStorage
+ indexMetaStorage,
+ clusterService.topologyService().localMember().id()
);
txStateStorage.lastApplied(3L, 1L);
@@ -593,7 +625,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
@Test
void updatesGroupConfigurationOnConfigCommit() {
commandListener.onConfigurationCommitted(new CommittedConfiguration(
- 1, 2, List.of("peer"), List.of("learner"),
List.of("old-peer"), List.of("old-learner")
+ raftIndex.incrementAndGet(), 2, List.of("peer"),
List.of("learner"), List.of("old-peer"), List.of("old-learner")
));
RaftGroupConfiguration expectedConfig = new RaftGroupConfiguration(
@@ -610,10 +642,10 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
@Test
void updatesLastAppliedIndexAndTermOnConfigCommit() {
commandListener.onConfigurationCommitted(new CommittedConfiguration(
- 1, 2, List.of("peer"), List.of("learner"),
List.of("old-peer"), List.of("old-learner")
+ 3, 2, List.of("peer"), List.of("learner"),
List.of("old-peer"), List.of("old-learner")
));
- verify(mvPartitionStorage).lastApplied(1, 2);
+ verify(mvPartitionStorage).lastApplied(3, 2);
}
@Test
@@ -624,20 +656,21 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
1, 2, List.of("peer"), List.of("learner"),
List.of("old-peer"), List.of("old-learner")
));
- verify(mvPartitionStorage, never()).committedGroupConfiguration(any());
- verify(mvPartitionStorage, never()).lastApplied(eq(1L), anyLong());
+ // Exact one call is expected because it's done in @BeforeEach in
order to prepare initial configuration.
+ verify(mvPartitionStorage,
times(1)).committedGroupConfiguration(any());
+ verify(mvPartitionStorage, times(1)).lastApplied(eq(1L), anyLong());
}
@Test
void locksOnConfigCommit() {
commandListener.onConfigurationCommitted(new CommittedConfiguration(
- 1, 2, List.of("peer"), List.of("learner"),
List.of("old-peer"), List.of("old-learner")
+ raftIndex.incrementAndGet(), 2, List.of("peer"),
List.of("learner"), List.of("old-peer"), List.of("old-learner")
));
InOrder inOrder = inOrder(partitionDataStorage);
inOrder.verify(partitionDataStorage).acquirePartitionSnapshotsReadLock();
- inOrder.verify(partitionDataStorage).lastApplied(1, 2);
+ inOrder.verify(partitionDataStorage).lastApplied(raftIndex.get(), 2);
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
}
@@ -706,10 +739,10 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.build();
commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, command)
+ writeCommandCommandClosure(raftIndex.incrementAndGet(), 2,
command)
).iterator());
- verify(mvPartitionStorage).lastApplied(3, 2);
+ verify(mvPartitionStorage).lastApplied(raftIndex.get(), 2);
}
@ParameterizedTest
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index c737a153c1..c865bd856e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -319,9 +319,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
return v;
});
- return completedFuture(new UpdateCommandResult(true));
+ return completedFuture(new UpdateCommandResult(true, true));
} else if (cmd instanceof UpdateAllCommand) {
- return completedFuture(new UpdateCommandResult(true));
+ return completedFuture(new UpdateCommandResult(true, true));
} else if (cmd instanceof FinishTxCommand) {
FinishTxCommand command = (FinishTxCommand) cmd;
@@ -1573,7 +1573,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertFalse(replicaCleanupFut.isDone());
- writeFut.complete(new UpdateCommandResult(true));
+ writeFut.complete(new UpdateCommandResult(true, true));
assertThat(replicaCleanupFut, willSucceedFast());
} finally {
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ac02e96bd6..e2c3d32d70 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
@@ -111,6 +112,8 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -179,6 +182,8 @@ import org.junit.jupiter.api.TestInfo;
public class ItTxTestCluster {
private static final int SCHEMA_VERSION = 1;
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
private final List<NetworkAddress> localAddresses;
private final NodeFinder nodeFinder;
@@ -713,7 +718,9 @@ public class ItTxTestCluster {
catalogService,
schemaManager,
clockServices.get(assignment),
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ // TODO use proper index.
+
clusterServices.get(assignment).topologyService().getByConsistentId(assignment).id()
);
Function<RaftGroupService, ReplicaListener>
createReplicaListener = raftClient -> newReplicaListener(
@@ -758,6 +765,35 @@ public class ItTxTestCluster {
allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
+ for (int p = 0; p < assignments.size(); p++) {
+ TablePartitionId grpId = grpIds.get(p);
+ CompletableFuture<ReplicaMeta> primaryFuture =
placementDriver.getPrimaryReplica(grpId,
+ clockServices.values().iterator().next().now());
+
+ // TestPlacementDriver always returns completed futures.
+ assert primaryFuture.isDone();
+
+ ReplicaMeta primary = primaryFuture.join();
+
+ assert primary.getLeaseholderId() != null;
+
+ PrimaryReplicaChangeCommand cmd =
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+ .leaseStartTime(primary.getStartTime().longValue())
+ .primaryReplicaNodeId(primary.getLeaseholderId())
+ .primaryReplicaNodeName(primary.getLeaseholder())
+ .build();
+
+ CompletableFuture<RaftGroupService> raftClientFuture =
getRaftClientForGroup(grpId);
+
+ assertThat(raftClientFuture, willCompleteSuccessfully());
+
+ CompletableFuture<?> primaryReplicaChangePropagationFuture =
raftClientFuture.join().run(cmd);
+
+ partitionReadyFutures.add(primaryReplicaChangePropagationFuture);
+ }
+
+ allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join();
+
return table;
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 42cfb687b6..449e129ab7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -159,12 +159,26 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
}
@Override
- public void updateLease(long leaseStartTime) {
- partitionStorage.updateLease(leaseStartTime);
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
+ partitionStorage.updateLease(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
}
@Override
public long leaseStartTime() {
return partitionStorage.leaseStartTime();
}
+
+ @Override
+ public String primaryReplicaNodeId() {
+ return partitionStorage.primaryReplicaNodeId();
+ }
+
+ @Override
+ public String primaryReplicaNodeName() {
+ return partitionStorage.primaryReplicaNodeName();
+ }
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 5f747189e7..ef8c5822f6 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -60,11 +60,13 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -72,6 +74,8 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -142,6 +146,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
private static final ReplicationGroupId crossTableGroupId = new
TablePartitionId(333, 0);
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
private PartitionListener partitionListener;
private ReplicaListener replicaListener;
@@ -285,7 +291,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
.when(replicaSvc).invoke(anyString(), any());
}
- AtomicLong raftIndex = new AtomicLong();
+ AtomicLong raftIndex = new AtomicLong(1);
// Delegate directly to listener.
lenient().doAnswer(
@@ -418,8 +424,36 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
catalogService,
schemaManager,
CLOCK_SERVICE,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ LOCAL_NODE.id()
);
+
+ // Update(All)Command handling requires both information about raft
group topology and the primary replica,
+ // thus onConfigurationCommited and primaryReplicaChangeCommand are
called.
+ {
+ partitionListener.onConfigurationCommitted(new
CommittedConfiguration(
+ 1,
+ 1,
+ List.of(LOCAL_NODE.name()),
+ Collections.emptyList(),
+ null,
+ null
+ ));
+
+ CompletableFuture<ReplicaMeta> primaryMetaFuture =
placementDriver.getPrimaryReplica(groupId, CLOCK.now());
+
+ assertThat(primaryMetaFuture, willCompleteSuccessfully());
+
+ ReplicaMeta primary = primaryMetaFuture.join();
+
+ PrimaryReplicaChangeCommand primaryReplicaChangeCommand =
REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
+ .leaseStartTime(primary.getStartTime().longValue())
+ .primaryReplicaNodeId(primary.getLeaseholderId())
+ .primaryReplicaNodeName(primary.getLeaseholder())
+ .build();
+
+ assertThat(svc.run(primaryReplicaChangeCommand),
willCompleteSuccessfully());
+ }
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
index 5ddf95a66d..01d810969b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx;
import java.io.Serializable;
+import java.util.Objects;
import org.jetbrains.annotations.Nullable;
/**
@@ -32,32 +33,43 @@ public class UpdateCommandResult implements Serializable {
@Nullable
private final Long currentLeaseStartTime;
+ /** {@code true} if primary replica belongs to the raft group topology:
peers and learners, (@code false) otherwise. */
+ private final boolean primaryInPeersAndLearners;
+
/**
* Constructor.
*
- * @param primaryReplicaMatch Whether the command should be successfully
applied on primary replica.
+ * @param primaryReplicaMatch Whether the command was executed
successfully or failed due to mismatch of primary replica information.
*/
- public UpdateCommandResult(boolean primaryReplicaMatch) {
- this(primaryReplicaMatch, null);
+ public UpdateCommandResult(boolean primaryReplicaMatch, boolean
primaryInPeersAndLearners) {
+ this(primaryReplicaMatch, null, primaryInPeersAndLearners);
}
/**
* Constructor.
*
- * @param primaryReplicaMatch Whether the command should be successfully
applied on primary replica.
+ * @param primaryReplicaMatch Whether the command was executed
successfully or failed due to mismatch of primary replica information.
* @param currentLeaseStartTime Actual lease start time.
+ * @param primaryInPeersAndLearners {@code true} if primary replica
belongs to the raft group topology: peers and learners,
+ * (@code false) otherwise.
*/
- public UpdateCommandResult(boolean primaryReplicaMatch, @Nullable Long
currentLeaseStartTime) {
+ public UpdateCommandResult(
+ boolean primaryReplicaMatch,
+ @Nullable Long currentLeaseStartTime,
+ boolean primaryInPeersAndLearners
+ ) {
assert primaryReplicaMatch || currentLeaseStartTime != null :
"Incorrect UpdateCommandResult.";
this.primaryReplicaMatch = primaryReplicaMatch;
this.currentLeaseStartTime = currentLeaseStartTime;
+ this.primaryInPeersAndLearners = primaryInPeersAndLearners;
}
/**
- * Whether the command should be successfully applied on primary replica.
+ * Whether the command was executed successfully or failed due to mismatch
of primary replica information, i.e. lease start time that
+ * was sent along with the command doesn't match the one in raft updated
by handlePrimaryReplicaChangeCommand.
*
- * @return Whether the command should be successfully applied on primary
replica.
+ * @return Whether the command was executed successfully or failed due to
mismatch of primary replica information.
*/
public boolean isPrimaryReplicaMatch() {
return primaryReplicaMatch;
@@ -73,6 +85,15 @@ public class UpdateCommandResult implements Serializable {
return currentLeaseStartTime;
}
+ /**
+ * Returns whether primary replica belongs to the raft group topology:
peers and learners.
+ *
+ * @return {@code true} if primary replica belongs to the raft group
topology: peers and learners, (@code false) otherwise.
+ */
+ public boolean isPrimaryInPeersAndLearners() {
+ return primaryInPeersAndLearners;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -81,20 +102,13 @@ public class UpdateCommandResult implements Serializable {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
UpdateCommandResult that = (UpdateCommandResult) o;
-
- if (primaryReplicaMatch != that.primaryReplicaMatch) {
- return false;
- }
- return currentLeaseStartTime != null ?
currentLeaseStartTime.equals(that.currentLeaseStartTime)
- : that.currentLeaseStartTime == null;
+ return primaryReplicaMatch == that.primaryReplicaMatch &&
primaryInPeersAndLearners == that.primaryInPeersAndLearners
+ && Objects.equals(currentLeaseStartTime,
that.currentLeaseStartTime);
}
@Override
public int hashCode() {
- int result = (primaryReplicaMatch ? 1 : 0);
- result = 31 * result + (currentLeaseStartTime != null ?
currentLeaseStartTime.hashCode() : 0);
- return result;
+ return Objects.hash(primaryReplicaMatch, currentLeaseStartTime,
primaryInPeersAndLearners);
}
}