This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ebb092f0ce IGNITE-20609 Move last commit timestamp into
BinaryRowMessage (#2749)
ebb092f0ce is described below
commit ebb092f0ced589657411d23277606250ef467c6e
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Oct 31 16:52:45 2023 +0300
IGNITE-20609 Move last commit timestamp into BinaryRowMessage (#2749)
---
.../ignite/distributed/ItTablePersistenceTest.java | 8 +-
.../ignite/internal/table/ItColocationTest.java | 21 +++--
.../table/distributed/StorageUpdateHandler.java | 16 ++--
.../table/distributed/TableMessageGroup.java | 8 ++
...dateCommand.java => TimedBinaryRowMessage.java} | 47 +++++------
.../distributed/command/UpdateAllCommand.java | 25 +++---
.../table/distributed/command/UpdateCommand.java | 20 ++---
.../table/distributed/raft/PartitionListener.java | 14 +++-
.../replicator/PartitionReplicaListener.java | 92 +++++++++++-----------
.../distributed/replicator/TimedBinaryRow.java | 4 +-
.../internal/table/distributed/IndexBaseTest.java | 7 +-
.../distributed/StorageUpdateHandlerTest.java | 4 +-
.../PartitionRaftCommandsSerializationTest.java | 49 ++++++++----
.../raft/PartitionCommandListenerTest.java | 37 ++++++---
14 files changed, 193 insertions(+), 159 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 1297653efc..fddf2b1e69 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -310,12 +310,12 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
.txId(req0.transactionId())
.tablePartitionId(tablePartitionId(new
TablePartitionId(1, 0)))
.rowUuid(new RowId(0).uuid())
- .rowMessage(
- msgFactory.binaryRowMessage()
+ .messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(msgFactory.binaryRowMessage()
.schemaVersion(req0.schemaVersion())
.binaryTuple(req0.binaryTuple())
- .build()
- )
+ .build())
+ .build())
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())
.build();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 611d33fe36..2b0fe309f9 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -50,7 +50,6 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -73,6 +72,7 @@ import
org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -207,9 +207,14 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
if (request instanceof ReadWriteMultiRowReplicaRequest) {
ReadWriteMultiRowReplicaRequest multiRowReplicaRequest =
(ReadWriteMultiRowReplicaRequest) request;
- Map<UUID, BinaryRowMessage> rows =
multiRowReplicaRequest.binaryTuples().stream()
- .map(tupleBuffer -> binaryRowMessage(tupleBuffer,
multiRowReplicaRequest))
- .collect(toMap(row ->
TestTransactionIds.newTransactionId(), Function.identity()));
+
+ Map<UUID, TimedBinaryRowMessage> rows =
multiRowReplicaRequest.binaryTuples().stream()
+ .collect(
+ toMap(tupleBuffer ->
TestTransactionIds.newTransactionId(),
+ tupleBuffer ->
MSG_FACTORY.timedBinaryRowMessage()
+
.binaryRowMessage(binaryRowMessage(tupleBuffer, multiRowReplicaRequest))
+ .build())
+ );
return r.run(MSG_FACTORY.updateAllCommand()
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
@@ -217,7 +222,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
.partitionId(commitPartId.partitionId())
.build()
)
- .rowsToUpdate(rows)
+ .messageRowsToUpdate(rows)
.txId(UUID.randomUUID())
.txCoordinatorId(node.id())
.build());
@@ -234,7 +239,9 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
.build()
)
.rowUuid(UUID.randomUUID())
-
.rowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(),
singleRowReplicaRequest))
+ .messageRowToUpdate(MSG_FACTORY.timedBinaryRowMessage()
+
.binaryRowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(),
singleRowReplicaRequest))
+ .build())
.txId(TestTransactionIds.newTransactionId())
.txCoordinatorId(node.id())
.build());
@@ -390,7 +397,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () ->
"part=" + p + ", set=" + set);
cmd.rowsToUpdate().values().forEach(rowMessage -> {
- Row r = Row.wrapBinaryRow(schema, rowMessage.asBinaryRow());
+ Row r = Row.wrapBinaryRow(schema, rowMessage.binaryRow());
assertEquals(intTable.partition(r), p);
});
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 155d8e792d..b7c4f9f367 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
-import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replicator.PendingRows;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -167,16 +167,14 @@ public class StorageUpdateHandler {
* @param trackWriteIntent If {@code true} then write intent should be
tracked.
* @param onApplication Callback on application.
* @param commitTs Commit timestamp to use on autocommit.
- * @param lastCommitTsMap A map(Row Id -> timestamp) of timestamps of the
most recent commits to the affected rows.
*/
public void handleUpdateAll(
UUID txId,
- Map<UUID, BinaryRowMessage> rowsToUpdate,
+ Map<UUID, TimedBinaryRow> rowsToUpdate,
TablePartitionId commitPartitionId,
boolean trackWriteIntent,
@Nullable Runnable onApplication,
- @Nullable HybridTimestamp commitTs,
- Map<UUID, HybridTimestamp> lastCommitTsMap
+ @Nullable HybridTimestamp commitTs
) {
indexUpdateHandler.waitIndexes();
@@ -188,15 +186,15 @@ public class StorageUpdateHandler {
List<RowId> rowIds = new ArrayList<>();
// Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, BinaryRowMessage> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
+ SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
- for (Map.Entry<UUID, BinaryRowMessage> entry :
sortedRowsToUpdateMap.entrySet()) {
+ for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().asBinaryRow();
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
locker.lock(rowId);
- performStorageCleanupIfNeeded(txId, rowId,
lastCommitTsMap.get(entry.getKey()));
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
if (commitTs != null) {
storage.addWriteCommitted(rowId, row, commitTs);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 38ddd35f0c..9e697fa2d5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.table.distributed.TableMessageGroup.GRO
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
@@ -37,6 +38,8 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
@@ -175,6 +178,11 @@ public interface TableMessageGroup {
*/
short RO_DIRECT_MULTI_ROW_REPLICA_REQUEST = 23;
+ /**
+ * Message type for {@link TimedBinaryRowMessage}.
+ */
+ short TIMED_BINARY_ROW_MESSAGE = 24;
+
/**
* Message types for Table module RAFT commands.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TimedBinaryRowMessage.java
similarity index 54%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TimedBinaryRowMessage.java
index 3e47dfd574..602650c0b1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TimedBinaryRowMessage.java
@@ -17,47 +17,38 @@
package org.apache.ignite.internal.table.distributed.command;
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
-
-import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
/**
- * State machine command to update a row specified by a row id.
+ * The message type represent a binary row and a timestamp.
*/
-@Transferable(TableMessageGroup.Commands.UPDATE)
-public interface UpdateCommand extends PartitionCommand {
- TablePartitionIdMessage tablePartitionId();
-
- UUID rowUuid();
-
- @Nullable
- BinaryRowMessage rowMessage();
-
- String txCoordinatorId();
-
- /** Returns the row to update or {@code null} if the row should be
removed. */
- @Nullable
- default BinaryRow row() {
- BinaryRowMessage message = rowMessage();
-
- return message == null ? null : message.asBinaryRow();
- }
+@Transferable(TableMessageGroup.TIMED_BINARY_ROW_MESSAGE)
+public interface TimedBinaryRowMessage extends NetworkMessage {
+ /**
+ * Gets a binary row message.
+ *
+ * @return binary row message.
+ */
+ @Nullable BinaryRowMessage binaryRowMessage();
/**
- * Returns the timestamp of the last committed entry.
+ * Gets a timestamp.
+ *
+ * @return Timestamp as long.
*/
- long lastCommitTimestampLong();
+ long timestamp();
/**
- * Returns the timestamp of the last committed entry.
+ * Gets a binary row form this message or {@code null} if the binary row
message is {@code null}.
+ *
+ * @return Binary row or {@code null}.
*/
- default @Nullable HybridTimestamp lastCommitTimestamp() {
- return nullableHybridTimestamp(lastCommitTimestampLong());
+ default @Nullable BinaryRow binaryRow() {
+ return binaryRowMessage() == null ? null :
binaryRowMessage().asBinaryRow();
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 4d75fd22be..e230361c4f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,16 +17,15 @@
package org.apache.ignite.internal.table.distributed.command;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.annotations.Transferable;
-import org.jetbrains.annotations.Nullable;
/**
* State machine command for updating a batch of entries.
@@ -35,23 +34,23 @@ import org.jetbrains.annotations.Nullable;
public interface UpdateAllCommand extends PartitionCommand {
TablePartitionIdMessage tablePartitionId();
- Map<UUID, BinaryRowMessage> rowsToUpdate();
+ Map<UUID, TimedBinaryRowMessage> messageRowsToUpdate();
String txCoordinatorId();
- // TODO: IGNITE-20609 row id in this map duplicates row id in rowsToUpdate.
- @Nullable Map<UUID, Long> lastCommitTimestampsLong();
-
/**
* Returns the timestamps of the last committed entries for each row.
*/
- default Map<UUID, HybridTimestamp> lastCommitTimestamps() {
- Map<UUID, HybridTimestamp> map = new HashMap<>();
+ default Map<UUID, TimedBinaryRow> rowsToUpdate() {
+ Map<UUID, TimedBinaryRow> map = new HashMap<>();
+
+ Map<UUID, TimedBinaryRowMessage> timedRowMap = messageRowsToUpdate();
- Map<UUID, Long> uuidLongMap = lastCommitTimestampsLong();
- if (uuidLongMap != null) {
- uuidLongMap.forEach((uuid, ts) -> map.put(uuid,
hybridTimestamp(ts)));
+ if (!CollectionUtils.nullOrEmpty(timedRowMap)) {
+ timedRowMap.forEach(
+ (uuid, trMsg) -> map.put(uuid, new
TimedBinaryRow(trMsg.binaryRow(), nullableHybridTimestamp(trMsg.timestamp()))));
}
+
return map;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 3e47dfd574..f283fa6ceb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -23,7 +23,6 @@ import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
@@ -36,28 +35,23 @@ public interface UpdateCommand extends PartitionCommand {
UUID rowUuid();
- @Nullable
- BinaryRowMessage rowMessage();
+ @Nullable TimedBinaryRowMessage messageRowToUpdate();
String txCoordinatorId();
/** Returns the row to update or {@code null} if the row should be
removed. */
- @Nullable
- default BinaryRow row() {
- BinaryRowMessage message = rowMessage();
+ default @Nullable BinaryRow rowToUpdate() {
+ TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
- return message == null ? null : message.asBinaryRow();
+ return tsRoMsg == null ? null : tsRoMsg.binaryRow();
}
- /**
- * Returns the timestamp of the last committed entry.
- */
- long lastCommitTimestampLong();
-
/**
* Returns the timestamp of the last committed entry.
*/
default @Nullable HybridTimestamp lastCommitTimestamp() {
- return nullableHybridTimestamp(lastCommitTimestampLong());
+ TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
+
+ return tsRoMsg == null ? null :
nullableHybridTimestamp(tsRoMsg.timestamp());
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9456db2c27..3713402199 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -235,7 +235,11 @@ public class PartitionListener implements
RaftGroupListener {
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
synchronized (safeTime) {
if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
!cmd.full(),
() -> storage.lastApplied(commandIndex, commandTerm),
cmd.full() ? cmd.safeTime() : null,
@@ -264,11 +268,13 @@ public class PartitionListener implements
RaftGroupListener {
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
synchronized (safeTime) {
if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdateAll(cmd.txId(),
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(),
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
!cmd.full(),
() -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? cmd.safeTime() : null,
- cmd.lastCommitTimestamps()
+ cmd.full() ? cmd.safeTime() : null
);
updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 94a16bd373..1ca4fd8b1e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.replicator;
import static java.util.Collections.emptyList;
-import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -26,6 +25,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -50,7 +50,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -115,6 +114,8 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
+import
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessageBuilder;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
@@ -2006,7 +2007,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
- Map<UUID, BinaryRowMessage> rowIdsToDelete = new
HashMap<>();
+ Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new
HashMap<>();
// TODO:IGNITE-20669 Replace the result to BitSet.
Collection<BinaryRow> result = new ArrayList<>();
List<RowId> rows = new ArrayList<>();
@@ -2015,7 +2016,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId lockedRowId = deleteExactLockFuts[i].join();
if (lockedRowId != null) {
- rowIdsToDelete.put(lockedRowId.uuid(), null);
+ rowIdsToDelete.put(lockedRowId.uuid(),
MSG_FACTORY.timedBinaryRowMessage()
+
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRowId.uuid())))
+ .build());
result.add(new NullBinaryRow());
@@ -2035,7 +2038,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
catalogVersion -> applyUpdateAllCommand(
request,
rowIdsToDelete,
- lastCommitTimes,
txCoordinatorId,
catalogVersion
)
@@ -2087,10 +2089,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
insertLockFuts[idx++] =
takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
}
- Map<UUID, BinaryRowMessage> convertedMap =
rowsToInsert.entrySet().stream()
+ Map<UUID, TimedBinaryRowMessage> convertedMap =
rowsToInsert.entrySet().stream()
.collect(toMap(
e -> e.getKey().uuid(),
- e -> binaryRowMessage(e.getValue())
+ e -> MSG_FACTORY.timedBinaryRowMessage()
+
.binaryRowMessage(binaryRowMessage(e.getValue()))
+ .build()
));
return allOf(insertLockFuts)
@@ -2101,7 +2105,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
.thenCompose(catalogVersion ->
applyUpdateAllCommand(
request,
convertedMap,
- emptyMap(),
txCoordinatorId,
catalogVersion
)
@@ -2141,13 +2144,17 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(rowIdFuts).thenCompose(ignore -> {
- Map<UUID, BinaryRowMessage> rowsToUpdate =
IgniteUtils.newHashMap(searchRows.size());
+ Map<UUID, TimedBinaryRowMessage> rowsToUpdate =
IgniteUtils.newHashMap(searchRows.size());
List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRows.size(); i++) {
RowId lockedRow = rowIdFuts[i].join().get1();
- rowsToUpdate.put(lockedRow.uuid(),
binaryRowMessage(searchRows.get(i)));
+ rowsToUpdate.put(lockedRow.uuid(),
+ MSG_FACTORY.timedBinaryRowMessage()
+
.binaryRowMessage(binaryRowMessage(searchRows.get(i)))
+
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid())))
+ .build());
rows.add(lockedRow);
}
@@ -2162,7 +2169,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
catalogVersion -> applyUpdateAllCommand(
request,
rowsToUpdate,
- lastCommitTimes,
txCoordinatorId,
catalogVersion
)
@@ -2251,7 +2257,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(rowIdLockFuts).thenCompose(ignore -> {
- Map<UUID, BinaryRowMessage> rowIdsToDelete = new
HashMap<>();
+ Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new
HashMap<>();
// TODO:IGNITE-20669 Replace the result to BitSet.
Collection<BinaryRow> result = new ArrayList<>();
List<RowId> rows = new ArrayList<>();
@@ -2260,7 +2266,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId lockedRowId = lockFut.join();
if (lockedRowId != null) {
- rowIdsToDelete.put(lockedRowId.uuid(), null);
+ rowIdsToDelete.put(lockedRowId.uuid(),
MSG_FACTORY.timedBinaryRowMessage()
+
.timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRowId.uuid())))
+ .build());
rows.add(lockedRowId);
@@ -2279,7 +2287,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
rowIdsToDelete,
- lastCommitTimes,
request.commitPartitionId(),
request.transactionId(),
request.full(),
@@ -2384,7 +2391,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
cmd.txId(),
cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(),
- cmd.row(),
+ cmd.rowToUpdate(),
true,
null,
null,
@@ -2409,7 +2416,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
cmd.txId(),
cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(),
- cmd.row(),
+ cmd.rowToUpdate(),
false,
null,
cmd.safeTime(),
@@ -2455,8 +2462,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
/**
* Executes an UpdateAll command.
*
- * @param rowsToUpdate All {@link BinaryRow}s represented as {@link
ByteBuffer}s to be updated.
- * @param lastCommitTimes All timestamps of the last committed entries for
each row.
+ * @param rowsToUpdate All {@link BinaryRow}s represented as {@link
TimedBinaryRowMessage}s to be updated.
* @param commitPartitionId Partition ID that these rows belong to.
* @param transactionId Transaction ID.
* @param full {@code true} if this is a single-command transaction.
@@ -2466,8 +2472,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
- Map<UUID, BinaryRowMessage> rowsToUpdate,
- Map<UUID, HybridTimestamp> lastCommitTimes,
+ Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
TablePartitionIdMessage commitPartitionId,
UUID transactionId,
boolean full,
@@ -2478,7 +2483,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
synchronized (commandProcessingLinearizationMutex) {
UpdateAllCommand cmd = updateAllCommand(
rowsToUpdate,
- lastCommitTimes,
commitPartitionId,
transactionId,
hybridClock.now(),
@@ -2498,8 +2502,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
cmd.tablePartitionId().asTablePartitionId(),
true,
null,
- null,
- emptyMap());
+ null
+ );
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
}
@@ -2525,8 +2529,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
cmd.tablePartitionId().asTablePartitionId(),
true,
null,
- null,
- emptyMap());
+ null
+ );
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
}
@@ -2548,8 +2552,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
cmd.tablePartitionId().asTablePartitionId(),
false,
null,
- cmd.safeTime(),
- emptyMap());
+ cmd.safeTime()
+ );
return null;
});
@@ -2561,22 +2565,19 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Executes an UpdateAll command.
*
* @param request Read write multi rows replica request.
- * @param rowsToUpdate All {@link BinaryRow}s represented as {@link
ByteBuffer}s to be updated.
- * @param lastCommitTimes All timestamps of the last committed entries for
each row.
+ * @param rowsToUpdate All {@link BinaryRow}s represented as {@link
TimedBinaryRowMessage}s to be updated.
* @param txCoordinatorId Transaction coordinator id.
* @param catalogVersion Validated catalog version associated with given
operation.
* @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
ReadWriteMultiRowReplicaRequest request,
- Map<UUID, BinaryRowMessage> rowsToUpdate,
- Map<UUID, HybridTimestamp> lastCommitTimes,
+ Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
String txCoordinatorId,
int catalogVersion
) {
return applyUpdateAllCommand(
rowsToUpdate,
- lastCommitTimes,
request.commitPartitionId(),
request.transactionId(),
request.full(),
@@ -3433,12 +3434,18 @@ public class PartitionReplicaListener implements
ReplicaListener {
.txCoordinatorId(txCoordinatorId)
.requiredCatalogVersion(catalogVersion);
- if (lastCommitTimestamp != null) {
- bldr.lastCommitTimestampLong(lastCommitTimestamp.longValue());
- }
+ if (lastCommitTimestamp != null || row != null) {
+ TimedBinaryRowMessageBuilder rowMsgBldr =
MSG_FACTORY.timedBinaryRowMessage();
+
+ if (lastCommitTimestamp != null) {
+ rowMsgBldr.timestamp(lastCommitTimestamp.longValue());
+ }
+
+ if (row != null) {
+ rowMsgBldr.binaryRowMessage(binaryRowMessage(row));
+ }
- if (row != null) {
- bldr.rowMessage(binaryRowMessage(row));
+ bldr.messageRowToUpdate(rowMsgBldr.build());
}
return bldr.build();
@@ -3452,8 +3459,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
private static UpdateAllCommand updateAllCommand(
- Map<UUID, BinaryRowMessage> rowsToUpdate,
- Map<UUID, HybridTimestamp> lastCommitTimes,
+ Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
TablePartitionIdMessage commitPartitionId,
UUID transactionId,
HybridTimestamp safeTimeTimestamp,
@@ -3463,18 +3469,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
) {
return MSG_FACTORY.updateAllCommand()
.tablePartitionId(commitPartitionId)
- .rowsToUpdate(rowsToUpdate)
+ .messageRowsToUpdate(rowsToUpdate)
.txId(transactionId)
.safeTimeLong(safeTimeTimestamp.longValue())
.full(full)
.txCoordinatorId(txCoordinatorId)
.requiredCatalogVersion(catalogVersion)
- .lastCommitTimestampsLong(
- // Also make sure lastCommitTimes contains only those
entries that match rowsToUpdate.
- lastCommitTimes.entrySet().stream()
- .filter(entry ->
rowsToUpdate.containsKey(entry.getKey()))
- .collect(toMap(Entry::getKey, entry ->
entry.getValue().longValue()))
- )
.build();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
index 2dee94ab59..fcecf5cf39 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java
@@ -24,13 +24,13 @@ import org.jetbrains.annotations.Nullable;
/**
* Row with time.
*/
-class TimedBinaryRow {
+public class TimedBinaryRow {
private final @Nullable BinaryRow binaryRow;
private final @Nullable HybridTimestamp commitTimestamp;
- TimedBinaryRow(@Nullable BinaryRow binaryRow, @Nullable HybridTimestamp
commitTimestamp) {
+ public TimedBinaryRow(@Nullable BinaryRow binaryRow, @Nullable
HybridTimestamp commitTimestamp) {
this.binaryRow = binaryRow;
this.commitTimestamp = commitTimestamp;
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index bcbed62f92..dfe0808399 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.mockito.Mockito.mock;
@@ -49,6 +48,7 @@ import
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Cursor;
@@ -254,12 +254,11 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
handler.handleUpdateAll(
TX_ID,
- singletonMap(rowUuid, rowMessage),
+ singletonMap(rowUuid, new TimedBinaryRow(rowMessage ==
null ? null : rowMessage.asBinaryRow(), null)),
partitionId,
true,
null,
- null,
- emptyMap()
+ null
);
}
};
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 1934ed5477..e3913313e2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.Collections.emptyMap;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -140,8 +139,7 @@ public class StorageUpdateHandlerTest extends
BaseIgniteAbstractTest {
new TablePartitionId(TABLE_ID, PARTITION_ID),
false,
null,
- null,
- emptyMap()
+ null
);
verify(partitionStorage).peek(lwm);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 307189f1e0..8acabb11f3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.distributed.command;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -51,6 +53,9 @@ import org.junit.jupiter.api.Test;
* Test for partition RAFT commands serialization.
*/
public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest
{
+ /** Hybrid clock. */
+ private final HybridClockImpl clock = new HybridClockImpl();
+
/** Key-value marshaller for tests. */
private static KvMarshaller<TestKey, TestValue> kvMarshaller;
@@ -81,7 +86,9 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
.build()
)
.rowUuid(UUID.randomUUID())
- .rowMessage(binaryRowMessage(1))
+ .messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(binaryRowMessage(1))
+ .build())
.txId(TestTransactionIds.newTransactionId())
.txCoordinatorId(UUID.randomUUID().toString())
.build();
@@ -108,15 +115,21 @@ public class PartitionRaftCommandsSerializationTest
extends IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
assertEquals(cmd.rowUuid(), readCmd.rowUuid());
- assertNull(readCmd.rowMessage());
+ assertNull(readCmd.rowToUpdate());
}
@Test
public void testUpdateAllCommand() throws Exception {
- Map<UUID, BinaryRowMessage> rowsToUpdate = new HashMap<>();
+ Map<UUID, TimedBinaryRowMessage> rowsToUpdate = new HashMap<>();
for (int i = 0; i < 10; i++) {
- rowsToUpdate.put(TestTransactionIds.newTransactionId(),
binaryRowMessage(i));
+ rowsToUpdate.put(
+ TestTransactionIds.newTransactionId(),
+ msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(binaryRowMessage(i))
+ .timestamp(i % 2 == 0 ? clock.nowLong() :
NULL_HYBRID_TIMESTAMP)
+ .build()
+ );
}
var cmd = msgFactory.updateAllCommand()
@@ -125,7 +138,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
.partitionId(1)
.build()
)
- .rowsToUpdate(rowsToUpdate)
+ .messageRowsToUpdate(rowsToUpdate)
.txId(UUID.randomUUID())
.txCoordinatorId(UUID.randomUUID().toString())
.build();
@@ -134,22 +147,28 @@ public class PartitionRaftCommandsSerializationTest
extends IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
- for (Map.Entry<UUID, BinaryRowMessage> entry :
cmd.rowsToUpdate().entrySet()) {
+ for (Map.Entry<UUID, TimedBinaryRowMessage> entry :
cmd.messageRowsToUpdate().entrySet()) {
assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey()));
- var readVal = readCmd.rowsToUpdate().get(entry.getKey());
- var val = entry.getValue();
+ var readVal =
readCmd.rowsToUpdate().get(entry.getKey()).binaryRow();
+ var val = entry.getValue().binaryRow();
assertEquals(val, readVal);
+
+ var readTs =
readCmd.rowsToUpdate().get(entry.getKey()).commitTimestamp();
+ var ts = nullableHybridTimestamp(entry.getValue().timestamp());
+
+ assertEquals(ts, readTs);
}
}
@Test
public void testRemoveAllCommand() throws Exception {
- Map<UUID, BinaryRowMessage> rowsToRemove = new HashMap<>();
+ Map<UUID, TimedBinaryRowMessage> rowsToRemove = new HashMap<>();
for (int i = 0; i < 10; i++) {
- rowsToRemove.put(TestTransactionIds.newTransactionId(), null);
+ rowsToRemove.put(TestTransactionIds.newTransactionId(),
msgFactory.timedBinaryRowMessage()
+ .build());
}
var cmd = msgFactory.updateAllCommand()
@@ -158,7 +177,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
.partitionId(1)
.build()
)
- .rowsToUpdate(rowsToRemove)
+ .messageRowsToUpdate(rowsToRemove)
.txId(UUID.randomUUID())
.txCoordinatorId(UUID.randomUUID().toString())
.build();
@@ -167,9 +186,9 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
- for (UUID uuid : cmd.rowsToUpdate().keySet()) {
+ for (UUID uuid : cmd.messageRowsToUpdate().keySet()) {
assertTrue(readCmd.rowsToUpdate().containsKey(uuid));
- assertNull(readCmd.rowsToUpdate().get(uuid));
+ assertNull(readCmd.rowsToUpdate().get(uuid).binaryRow());
}
}
@@ -248,7 +267,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
.txId(updateCommand.txId())
.rowUuid(updateCommand.rowUuid())
.tablePartitionId(updateCommand.tablePartitionId())
- .rowMessage(updateCommand.rowMessage())
+ .messageRowToUpdate(updateCommand.messageRowToUpdate())
.txCoordinatorId(updateCommand.txCoordinatorId())
.build();
} else if (cmd instanceof UpdateAllCommand) {
@@ -256,7 +275,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
return (T) msgFactory.updateAllCommand()
.txId(updateCommand.txId())
- .rowsToUpdate(updateCommand.rowsToUpdate())
+ .messageRowsToUpdate(updateCommand.messageRowsToUpdate())
.tablePartitionId(updateCommand.tablePartitionId())
.txCoordinatorId(updateCommand.txCoordinatorId())
.build();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 4d41882d2c..3773ccea66 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -90,6 +90,7 @@ import
org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
@@ -615,12 +616,17 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
* Inserts all rows.
*/
private void insertAll() {
- Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
+ Map<UUID, TimedBinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
UUID txId = TestTransactionIds.newTransactionId();
var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
for (int i = 0; i < KEY_COUNT; i++) {
- rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i));
+ rows.put(
+ TestTransactionIds.newTransactionId(),
+ msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(getTestRow(i, i))
+ .build()
+ );
}
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -631,7 +637,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.tableId(commitPartId.tableId())
.partitionId(commitPartId.partitionId())
.build())
- .rowsToUpdate(rows)
+ .messageRowsToUpdate(rows)
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())
@@ -654,12 +660,16 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private void updateAll(Function<Integer, Integer> keyValueMapper) {
UUID txId = TestTransactionIds.newTransactionId();
var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
- Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
+ Map<UUID, TimedBinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
for (int i = 0; i < KEY_COUNT; i++) {
ReadResult readResult = readRow(getTestKey(i));
- rows.put(readResult.rowId().uuid(), getTestRow(i,
keyValueMapper.apply(i)));
+ rows.put(readResult.rowId().uuid(),
+ msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(getTestRow(i,
keyValueMapper.apply(i)))
+ .build()
+ );
}
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -670,7 +680,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.tableId(commitPartId.tableId())
.partitionId(commitPartId.partitionId())
.build())
- .rowsToUpdate(rows)
+ .messageRowsToUpdate(rows)
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())
@@ -691,12 +701,13 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private void deleteAll() {
UUID txId = TestTransactionIds.newTransactionId();
var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
- Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
+ Map<UUID, TimedBinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
for (int i = 0; i < KEY_COUNT; i++) {
ReadResult readResult = readRow(getTestKey(i));
- keyRows.put(readResult.rowId().uuid(), null);
+ keyRows.put(readResult.rowId().uuid(),
msgFactory.timedBinaryRowMessage()
+ .build());
}
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -707,7 +718,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.tableId(commitPartId.tableId())
.partitionId(commitPartId.partitionId())
.build())
- .rowsToUpdate(keyRows)
+ .messageRowsToUpdate(keyRows)
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())
@@ -745,7 +756,9 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.tableId(1)
.partitionId(PARTITION_ID).build())
.rowUuid(readResult.rowId().uuid())
- .rowMessage(row)
+
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(row)
+ .build())
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())
@@ -862,7 +875,9 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.tableId(1)
.partitionId(PARTITION_ID).build())
.rowUuid(UUID.randomUUID())
- .rowMessage(getTestRow(i, i))
+
.messageRowToUpdate(msgFactory.timedBinaryRowMessage()
+ .binaryRowMessage(getTestRow(i, i))
+ .build())
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())