This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 97c531227e IGNITE-19234 Enable and fix group reentry logic for
volatile storages (#3763)
97c531227e is described below
commit 97c531227ebe74a2211626da5472ec05818152d4
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu May 16 10:52:24 2024 +0400
IGNITE-19234 Enable and fix group reentry logic for volatile storages
(#3763)
The reentry logic (see IGNITE-16668) was erroneously disabled. This commit
enables it back and contains fixes required to make it work (due to changes
made to the common rebalancing code since it was disabled).
---
.../ignite/internal/affinity/Assignments.java | 16 +++
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 6 +-
.../PartitionReplicatorNodeRecovery.java | 113 +++++++++++++++------
.../internal/table/distributed/TableManager.java | 17 ++--
.../{HasDataResponse.java => DataPresence.java} | 18 ++--
.../table/distributed/message/HasDataResponse.java | 13 ++-
.../ignite/internal/utils/RebalanceUtilEx.java | 19 ++--
7 files changed, 143 insertions(+), 59 deletions(-)
diff --git
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
index d0498b8047..2f0d7fcbee 100644
---
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
@@ -107,6 +107,22 @@ public class Assignments implements Serializable {
return force;
}
+ /**
+ * Adds an assignment to this collection of assignments.
+ *
+ * @param assignment Assignment to add.
+ */
+ public void add(Assignment assignment) {
+ nodes.add(assignment);
+ }
+
+ /**
+ * Returns {@code true} if this collection has no assignments, {@code
false} if it has some assignments.
+ */
+ public boolean isEmpty() {
+ return nodes.isEmpty();
+ }
+
/**
* Serializes the instance into an array of bytes.
*/
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 70b68a98e6..fadc8e6510 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -65,6 +65,7 @@ import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
@@ -172,9 +173,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
@ParameterizedTest
@ValueSource(strings = {
RocksDbStorageEngine.ENGINE_NAME,
- PersistentPageMemoryStorageEngine.ENGINE_NAME
- // TODO: uncomment when
https://issues.apache.org/jira/browse/IGNITE-19234 is fixed
-// VolatilePageMemoryStorageEngine.ENGINE_NAME
+ PersistentPageMemoryStorageEngine.ENGINE_NAME,
+ VolatilePageMemoryStorageEngine.ENGINE_NAME
})
void leaderFeedsFollowerWithSnapshot(String storageEngine) throws
Exception {
testLeaderFeedsFollowerWithSnapshot(storageEngine);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
index fc34504bd5..8c22b1030e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -20,17 +20,20 @@ package org.apache.ignite.internal.table.distributed;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.IntFunction;
@@ -43,9 +46,12 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.message.DataPresence;
import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
import org.apache.ignite.internal.utils.RebalanceUtilEx;
@@ -70,6 +76,8 @@ class PartitionReplicatorNodeRecovery {
private final TopologyService topologyService;
+ private final Executor storageAccessExecutor;
+
/** Obtains a TableImpl instance by a table ID. */
private final IntFunction<TableViewInternal> tableById;
@@ -77,11 +85,13 @@ class PartitionReplicatorNodeRecovery {
MetaStorageManager metaStorageManager,
MessagingService messagingService,
TopologyService topologyService,
+ Executor storageAccessExecutor,
IntFunction<TableViewInternal> tableById
) {
this.metaStorageManager = metaStorageManager;
this.messagingService = messagingService;
this.topologyService = topologyService;
+ this.storageAccessExecutor = storageAccessExecutor;
this.tableById = tableById;
}
@@ -100,39 +110,58 @@ class PartitionReplicatorNodeRecovery {
HasDataRequest msg = (HasDataRequest) message;
- int tableId = msg.tableId();
- int partitionId = msg.partitionId();
+ storageAccessExecutor.execute(() -> handleHasDataRequest(msg,
sender, correlationId));
+ }
+ });
+ }
- boolean storageHasData = false;
+ private void handleHasDataRequest(HasDataRequest msg, ClusterNode sender,
Long correlationId) {
+ int tableId = msg.tableId();
+ int partitionId = msg.partitionId();
- TableViewInternal table = tableById.apply(tableId);
+ DataPresence dataPresence = DataPresence.UNKNOWN;
- if (table != null) {
- MvTableStorage storage = table.internalTable().storage();
+ TableViewInternal table = tableById.apply(tableId);
- MvPartitionStorage mvPartition =
storage.getMvPartition(partitionId);
+ if (table != null) {
+ MvTableStorage storage = table.internalTable().storage();
- // If node's recovery process is incomplete (no partition
storage), then we consider this node's
- // partition storage empty.
- if (mvPartition != null) {
- storageHasData =
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
- }
- }
+ MvPartitionStorage mvPartition =
storage.getMvPartition(partitionId);
- messagingService.respond(sender,
TABLE_MESSAGES_FACTORY.hasDataResponse().result(storageHasData).build(),
correlationId);
+ if (mvPartition != null) {
+ try {
+ dataPresence =
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null
+ ? DataPresence.HAS_DATA : DataPresence.EMPTY;
+ } catch (StorageClosedException | StorageRebalanceException
ignored) {
+ // Ignoring so we'll return UNKNOWN for storageHasData
meaning that we have no idea.
+ }
}
- });
+ }
+
+ messagingService.respond(
+ sender,
+
TABLE_MESSAGES_FACTORY.hasDataResponse().presenceString(dataPresence.name()).build(),
+ correlationId
+ );
}
/**
- * Returns a future that completes with a decision: should we start the
corresponding group locally or not.
+ * Initiates group reentry (that is, exits the group and then enters it
again) if there is a possibility that
+ * this node lost its Raft metastorage state. This trick allows to solve
the double-voting problem (this node
+ * could vote for one candidate, then do a restart (losing its Raft
metastorage, including votedFor field), then
+ * vote for another candidate in the same term. As a result of removing
itself and adding self back, the term
+ * will be incremented, so the possible old vote will be invalidated.
+ *
+ * <p>The possibility of losing the Raft metastorage state is detected by
checking if the partition storage is
+ * volatile (and hence Raft metastorage is also volatile).
*
* @param tablePartitionId ID of the table partition.
* @param internalTable Table we are working with.
* @param newConfiguration New configuration that is going to be applied
if we'll start the group.
* @param localMemberAssignment Assignment of this node in this group.
+ * @return A future that completes with a decision: should we start the
corresponding group locally or not.
*/
- CompletableFuture<Boolean> shouldStartGroup(
+ CompletableFuture<Boolean> initiateGroupReentryIfNeeded(
TablePartitionId tablePartitionId,
InternalTable internalTable,
PeersAndLearners newConfiguration,
@@ -163,15 +192,15 @@ class PartitionReplicatorNodeRecovery {
// No majority and not a full partition restart - need to 'remove,
then add' nodes
// with current partition.
- return waitForPeersAndQueryDataNodesCount(tableId, partId,
newConfiguration.peers())
- .thenApply(dataNodesCount -> {
- boolean fullPartitionRestart = dataNodesCount == 0;
+ return waitForPeersAndQueryDataNodesCounts(tableId, partId,
newConfiguration.peers())
+ .thenApply(dataNodesCounts -> {
+ boolean fullPartitionRestart = dataNodesCounts.emptyNodes
== newConfiguration.peers().size();
if (fullPartitionRestart) {
return true;
}
- boolean majorityAvailable = dataNodesCount >=
(newConfiguration.peers().size() / 2) + 1;
+ boolean majorityAvailable = dataNodesCounts.nonEmptyNodes
>= (newConfiguration.peers().size() / 2) + 1;
if (majorityAvailable) {
RebalanceUtilEx.startPeerRemoval(tablePartitionId,
localMemberAssignment, metaStorageManager);
@@ -193,13 +222,13 @@ class PartitionReplicatorNodeRecovery {
* @param tblId Table id.
* @param partId Partition id.
* @param peers Raft peers.
- * @return A future that will hold the quantity of data nodes.
+ * @return A future that will hold the counts of data nodes.
*/
- private CompletableFuture<Long> waitForPeersAndQueryDataNodesCount(int
tblId, int partId, Collection<Peer> peers) {
+ private CompletableFuture<DataNodesCounts>
waitForPeersAndQueryDataNodesCounts(int tblId, int partId, Collection<Peer>
peers) {
HasDataRequest request =
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
return allPeersAreInTopology(peers)
- .thenCompose(unused -> queryDataNodesCount(peers, request));
+ .thenCompose(unused -> queryDataNodesCounts(peers, request));
}
private CompletableFuture<?> allPeersAreInTopology(Collection<Peer> peers)
{
@@ -274,9 +303,9 @@ class PartitionReplicatorNodeRecovery {
.thenCompose(identity());
}
- private CompletableFuture<Long> queryDataNodesCount(Collection<Peer>
peers, HasDataRequest request) {
+ private CompletableFuture<DataNodesCounts>
queryDataNodesCounts(Collection<Peer> peers, HasDataRequest request) {
//noinspection unchecked
- CompletableFuture<Boolean>[] requestFutures = peers.stream()
+ CompletableFuture<DataPresence>[] presenceFutures = peers.stream()
.map(Peer::consistentId)
.map(topologyService::getByConsistentId)
.filter(Objects::nonNull)
@@ -285,12 +314,36 @@ class PartitionReplicatorNodeRecovery {
.thenApply(response -> {
assert response instanceof HasDataResponse :
response;
- return ((HasDataResponse) response).result();
+ return ((HasDataResponse) response).presence();
})
- .exceptionally(unused -> false))
+ .exceptionally(unused -> DataPresence.UNKNOWN))
.toArray(CompletableFuture[]::new);
- return allOf(requestFutures)
- .thenApply(unused ->
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
+ return allOf(presenceFutures)
+ .thenApply(unused -> {
+ List<DataPresence> hasDataFlags =
Arrays.stream(presenceFutures)
+ .map(CompletableFuture::join)
+ .collect(toList());
+
+ long nodesSurelyHavingData =
hasDataFlags.stream().filter(presence -> presence ==
DataPresence.HAS_DATA).count();
+ long nodesSurelyEmpty =
hasDataFlags.stream().filter(presence -> presence ==
DataPresence.EMPTY).count();
+ return new DataNodesCounts(nodesSurelyHavingData,
nodesSurelyEmpty);
+ });
+ }
+
+ /**
+ * It is not guaranteed that {@link #nonEmptyNodes} plus {@link
#emptyNodes} gives the replicator group size
+ * as for some nodes we don't know at the moment whether they have data or
not.
+ */
+ private static class DataNodesCounts {
+ /** Number of nodes that reported that they have some data for the
partition of interest. */
+ private final long nonEmptyNodes;
+ /* Number of nodes that reported that they don't have any data for the
partition of interest. */
+ private final long emptyNodes;
+
+ private DataNodesCounts(long nonEmptyNodes, long emptyNodes) {
+ this.nonEmptyNodes = nonEmptyNodes;
+ this.emptyNodes = emptyNodes;
+ }
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index bda552db27..5dfb1ad1e4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -287,7 +287,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/**
* Versioned value for tracking RAFT groups initialization and starting
completion.
*
- * <p>Only explicitly updated in {@link
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int)}.
+ * <p>Only explicitly updated in {@link
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean)}.
*
* <p>Completed strictly after {@link #localPartitionsVv}.
*/
@@ -385,7 +385,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** Versioned value used only at manager startup to correctly fire table
creation events. */
private final IncrementalVersionedValue<Void> startVv;
- /** Ends at the {@link #stop()} with an {@link NodeStoppingException}. */
+ /** Ends at the {@link #stopAsync()} with an {@link
NodeStoppingException}. */
private final CompletableFuture<Void> stopManagerFuture = new
CompletableFuture<>();
/** Configuration for {@link StorageUpdateHandler}. */
@@ -570,6 +570,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
metaStorageMgr,
messagingService,
topologyService,
+ partitionOperationsExecutor,
tableId -> tablesById().get(tableId)
);
@@ -830,12 +831,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param assignmentsFuture Table assignments.
* @param table Initialized table entity.
* @param zoneId Zone id.
+ * @param isRecovery {@code true} if the node is being started up.
* @return future, which will be completed when the partitions creations
done.
*/
private CompletableFuture<Void> startLocalPartitionsAndClients(
CompletableFuture<List<Assignments>> assignmentsFuture,
TableImpl table,
- int zoneId
+ int zoneId,
+ boolean isRecovery
) {
int tableId = table.tableId();
@@ -858,7 +861,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
assignments.get(partId),
null,
zoneId,
- false
+ isRecovery
)
.whenComplete((res, ex) -> {
if (ex != null) {
@@ -934,7 +937,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
if (localMemberAssignment != null) {
CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
- ? partitionReplicatorNodeRecovery.shouldStartGroup(
+ ?
partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
replicaGrpId,
internalTbl,
newConfiguration,
@@ -1392,7 +1395,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
registerIndexesToTable(table, catalogService,
partitionSet, schemaRegistry, lwm);
}
- return
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id());
+ return
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id(),
onNodeRecovery);
}
), ioExecutor);
});
@@ -2528,7 +2531,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
/**
* Returns the future that will complete when, either the future from the
argument or {@link #stopManagerFuture} will complete,
- * successfully or exceptionally. Allows to protect from getting stuck at
{@link #stop()} when someone is blocked (by using
+ * successfully or exceptionally. Allows to protect from getting stuck at
{@link #stopAsync()} when someone is blocked (by using
* {@link #busyLock}) for a long time.
*
* @param future Future.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java
similarity index 65%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java
index 66f7b0f6d7..67fc4cbdbc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.table.distributed.message;
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-
/**
- * A response to the {@link HasDataRequest}.
+ * Whether a node has data or not (or it's not known because it did not
respond in time, or the corresopnding storage is
+ * already closed or still being rebalanced to).
*/
-@Transferable(TableMessageGroup.HAS_DATA_RESPONSE)
-public interface HasDataResponse extends NetworkMessage {
- /** {@code true} if a node has data for a partition of a table, {@code
false} otherwise. */
- boolean result();
+public enum DataPresence {
+ /** The storage is empty. */
+ EMPTY,
+ /** The storage has some data. */
+ HAS_DATA,
+ /** We don't know for some reason. */
+ UNKNOWN
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
index 66f7b0f6d7..69d7bd994e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
@@ -26,6 +26,15 @@ import
org.apache.ignite.internal.table.distributed.TableMessageGroup;
*/
@Transferable(TableMessageGroup.HAS_DATA_RESPONSE)
public interface HasDataResponse extends NetworkMessage {
- /** {@code true} if a node has data for a partition of a table, {@code
false} otherwise. */
- boolean result();
+ /**
+ * Data presence indicator.
+ */
+ default DataPresence presence() {
+ return DataPresence.valueOf(presenceString());
+ }
+
+ /**
+ * String representation of {@link #presence()}.
+ */
+ String presenceString();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
index 451e98c4c3..7a2a4da589 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -73,23 +74,23 @@ public class RebalanceUtilEx {
byte[] prevValue =
retrievedAssignmentsSwitchReduce.value();
if (prevValue != null) {
- Set<Assignment> prev = ByteUtils.fromBytes(prevValue);
+ Assignments prev = Assignments.fromBytes(prevValue);
prev.add(peerAssignment);
return metaStorageMgr.invoke(
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
- put(key, ByteUtils.toBytes(prev)),
+ put(key, prev.toBytes()),
Operations.noop()
);
} else {
- var newValue = new HashSet<>();
+ var newValue = Assignments.of(new HashSet<>());
newValue.add(peerAssignment);
return metaStorageMgr.invoke(
notExists(key),
- put(key, ByteUtils.toBytes(newValue)),
+ put(key, newValue.toBytes()),
Operations.noop()
);
}
@@ -118,7 +119,9 @@ public class RebalanceUtilEx {
Entry entry = event.entryEvent().newEntry();
byte[] eventData = entry.value();
- Set<Assignment> switchReduce = ByteUtils.fromBytes(eventData);
+ assert eventData != null : "Null event data for " + partId;
+
+ Assignments switchReduce = Assignments.fromBytes(eventData);
if (switchReduce.isEmpty()) {
return nullCompletedFuture();
@@ -128,10 +131,10 @@ public class RebalanceUtilEx {
ByteArray pendingKey = pendingPartAssignmentsKey(partId);
- Set<Assignment> pendingAssignments = difference(assignments,
switchReduce);
+ Set<Assignment> pendingAssignments = difference(assignments,
switchReduce.nodes());
- byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
- byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+ byte[] pendingByteArray = Assignments.toBytes(pendingAssignments);
+ byte[] assignmentsByteArray = Assignments.toBytes(assignments);
ByteArray changeTriggerKey = pendingChangeTriggerKey(partId);
byte[] rev = ByteUtils.longToBytes(entry.revision());