This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 209bac0c5f IGNITE-20727 Pass schema version in each read/write
ReplicaRequest (#2747)
209bac0c5f is described below
commit 209bac0c5f7111ef2ec1cb8c2a33625c92cddd14
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 25 13:21:18 2023 +0400
IGNITE-20727 Pass schema version in each read/write ReplicaRequest (#2747)
---
.../message/SchemaVersionAwareReplicaRequest.java} | 19 +--
.../sql/engine/exec/UpdatableTableImpl.java | 19 +--
.../ItInternalTableReadOnlyOperationsTest.java | 30 ++--
.../ignite/distributed/ItTablePersistenceTest.java | 7 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 58 +++----
.../ignite/internal/table/ItColocationTest.java | 18 +-
.../request/MultipleRowPkReplicaRequest.java | 4 +-
.../request/MultipleRowReplicaRequest.java | 21 +--
.../request/SingleRowPkReplicaRequest.java | 4 +-
.../request/SingleRowReplicaRequest.java | 10 +-
.../replication/request/SwapRowReplicaRequest.java | 16 +-
.../replicator/PartitionReplicaListener.java | 35 ++--
.../distributed/storage/InternalTableImpl.java | 189 +++++++++++++--------
.../PartitionReplicaListenerIndexLockingTest.java | 9 +-
.../replication/PartitionReplicaListenerTest.java | 64 ++++---
15 files changed, 285 insertions(+), 218 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/SchemaVersionAwareReplicaRequest.java
similarity index 60%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
copy to
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/SchemaVersionAwareReplicaRequest.java
index e60076022d..03a79787a6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/SchemaVersionAwareReplicaRequest.java
@@ -15,19 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.replication.request;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
-import org.apache.ignite.network.annotations.Marshallable;
+package org.apache.ignite.internal.replicator.message;
/**
- * Single row replica request involving a table's Primary Key.
+ * Requests that are aware about the schema version that must be used when
processing them.
*/
-public interface SingleRowPkReplicaRequest extends ReplicaRequest {
- ByteBuffer primaryKey();
-
- @Marshallable
- RequestType requestType();
+public interface SchemaVersionAwareReplicaRequest extends ReplicaRequest {
+ /**
+ * Schema version in which input values are marshalled and that must be
used when processing the request.
+ */
+ int schemaVersion();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index d1b92dbe4f..9272cc1123 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.storage.RowBatch;
@@ -189,7 +188,8 @@ public final class UpdatableTableImpl implements
UpdatableTable {
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
-
.binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
+
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
+ .binaryTuples(binaryRowsToBuffers(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_UPSERT_ALL)
@@ -203,16 +203,11 @@ public final class UpdatableTableImpl implements
UpdatableTable {
return CompletableFuture.allOf(futures);
}
- private static List<BinaryRowMessage>
serializeBinaryRows(Collection<BinaryRow> rows) {
- var result = new ArrayList<BinaryRowMessage>(rows.size());
+ private static List<ByteBuffer> binaryRowsToBuffers(Collection<BinaryRow>
rows) {
+ var result = new ArrayList<ByteBuffer>(rows.size());
for (BinaryRow row : rows) {
- BinaryRowMessage message = MESSAGES_FACTORY.binaryRowMessage()
- .binaryTuple(row.tupleSlice())
- .schemaVersion(row.schemaVersion())
- .build();
-
- result.add(message);
+ result.add(row.tupleSlice());
}
return result;
@@ -269,7 +264,8 @@ public final class UpdatableTableImpl implements
UpdatableTable {
ReadWriteMultiRowReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
-
.binaryRowMessages(serializeBinaryRows(rowBatch.requestedRows))
+
.schemaVersion(rowBatch.requestedRows.get(0).schemaVersion())
+ .binaryTuples(binaryRowsToBuffers(rowBatch.requestedRows))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_INSERT_ALL)
@@ -337,6 +333,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
+
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
.primaryKeys(serializePrimaryKeys(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 0f383329b7..bded985578 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -103,6 +104,9 @@ public class ItInternalTableReadOnlyOperationsTest extends
IgniteAbstractTest {
/** Internal table to test. */
private InternalTable internalTbl;
+ @Mock
+ private BinaryRowEx someRow;
+
/**
* Prepare test environment using DummyInternalTableImpl and Mocked
storage.
*/
@@ -238,19 +242,19 @@ public class ItInternalTableReadOnlyOperationsTest
extends IgniteAbstractTest {
when(tx.isReadOnly()).thenReturn(true);
List<Executable> executables = List.of(
- () -> internalTbl.delete(null, tx).get(),
- () -> internalTbl.deleteAll(null, tx).get(),
- () -> internalTbl.deleteExact(null, tx).get(),
- () -> internalTbl.deleteAllExact(null, tx).get(),
- () -> internalTbl.getAndDelete(null, tx).get(),
- () -> internalTbl.getAndReplace(null, tx).get(),
- () -> internalTbl.getAndUpsert(null, tx).get(),
- () -> internalTbl.upsert(null, tx).get(),
- () -> internalTbl.upsertAll(null, tx).get(),
- () -> internalTbl.insert(null, tx).get(),
- () -> internalTbl.insertAll(null, tx).get(),
- () -> internalTbl.replace(null, tx).get(),
- () -> internalTbl.replace(null, null, tx).get()
+ () -> internalTbl.delete(someRow, tx).get(),
+ () -> internalTbl.deleteAll(List.of(someRow), tx).get(),
+ () -> internalTbl.deleteExact(someRow, tx).get(),
+ () -> internalTbl.deleteAllExact(List.of(someRow), tx).get(),
+ () -> internalTbl.getAndDelete(someRow, tx).get(),
+ () -> internalTbl.getAndReplace(someRow, tx).get(),
+ () -> internalTbl.getAndUpsert(someRow, tx).get(),
+ () -> internalTbl.upsert(someRow, tx).get(),
+ () -> internalTbl.upsertAll(List.of(someRow), tx).get(),
+ () -> internalTbl.insert(someRow, tx).get(),
+ () -> internalTbl.insertAll(List.of(someRow), tx).get(),
+ () -> internalTbl.replace(someRow, tx).get(),
+ () -> internalTbl.replace(someRow, someRow, tx).get()
);
executables.forEach(executable -> {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 537754ccc6..1297653efc 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -310,7 +310,12 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
.txId(req0.transactionId())
.tablePartitionId(tablePartitionId(new
TablePartitionId(1, 0)))
.rowUuid(new RowId(0).uuid())
- .rowMessage(req0.binaryRowMessage())
+ .rowMessage(
+ msgFactory.binaryRowMessage()
+ .schemaVersion(req0.schemaVersion())
+ .binaryTuple(req0.binaryTuple())
+ .build()
+ )
.safeTimeLong(hybridClock.nowLong())
.txCoordinatorId(UUID.randomUUID().toString())
.build();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 083b847f2a..0d8af5898e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -149,14 +148,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
- ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
- .groupId(tablePartitionId)
- .transactionId(TestTransactionIds.newTransactionId())
- .commitPartitionId(tablePartitionId())
- .timestampLong(clock.nowLong())
- .binaryRowMessage(createKeyValueRow(1L, 1L))
- .requestType(RequestType.RW_GET)
- .build();
+ ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
(message, sender, correlationId) -> {
@@ -185,20 +177,27 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
assertEquals(5, respFur.get().result());
}
- @Test
- public void testStopReplicaException() {
- ClusterNode clusterNode =
clusterService.topologyService().localMember();
+ private ReadWriteSingleRowReplicaRequest getRequest(TablePartitionId
tablePartitionId) {
+ BinaryRow binaryRow = createKeyValueRow(1L, 1L);
- TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
-
- ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ return tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionId)
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId())
.timestampLong(clock.nowLong())
- .binaryRowMessage(createKeyValueRow(1L, 1L))
+ .schemaVersion(binaryRow.schemaVersion())
+ .binaryTuple(binaryRow.tupleSlice())
.requestType(RequestType.RW_GET)
.build();
+ }
+
+ @Test
+ public void testStopReplicaException() {
+ ClusterNode clusterNode =
clusterService.topologyService().localMember();
+
+ TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+
+ ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
(message, sender, correlationId) -> {
@@ -223,14 +222,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
- ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
- .groupId(tablePartitionId)
- .transactionId(TestTransactionIds.newTransactionId())
- .commitPartitionId(tablePartitionId())
- .timestampLong(clock.nowLong())
- .binaryRowMessage(createKeyValueRow(1L, 1L))
- .requestType(RequestType.RW_GET)
- .build();
+ ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
Exception e0 = null;
Exception e1 = null;
@@ -280,14 +272,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
}
);
- ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
- .groupId(tablePartitionId)
- .transactionId(TestTransactionIds.newTransactionId())
- .commitPartitionId(tablePartitionId())
- .timestampLong(clock.nowLong())
- .binaryRowMessage(createKeyValueRow(1L, 1L))
- .requestType(RequestType.RW_GET)
- .build();
+ ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
Exception e0 = null;
@@ -302,18 +287,13 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException)
unwrapCause(e0)).code());
}
- private BinaryRowMessage createKeyValueRow(long id, long value) {
+ private BinaryRow createKeyValueRow(long id, long value) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA);
rowBuilder.appendLong(id);
rowBuilder.appendLong(value);
- BinaryRow row = rowBuilder.build();
-
- return tableMessagesFactory.binaryRowMessage()
- .binaryTuple(row.tupleSlice())
- .schemaVersion(row.schemaVersion())
- .build();
+ return rowBuilder.build();
}
private TablePartitionIdMessage tablePartitionId() {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index e7afd3ee1d..611d33fe36 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -36,6 +36,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NullBinaryRow;
@@ -204,8 +206,9 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
RaftGroupService r = groupRafts.get(request.groupId());
if (request instanceof ReadWriteMultiRowReplicaRequest) {
- Map<UUID, BinaryRowMessage> rows =
((ReadWriteMultiRowReplicaRequest) request).binaryRowMessages()
- .stream()
+ ReadWriteMultiRowReplicaRequest multiRowReplicaRequest =
(ReadWriteMultiRowReplicaRequest) request;
+ Map<UUID, BinaryRowMessage> rows =
multiRowReplicaRequest.binaryTuples().stream()
+ .map(tupleBuffer -> binaryRowMessage(tupleBuffer,
multiRowReplicaRequest))
.collect(toMap(row ->
TestTransactionIds.newTransactionId(), Function.identity()));
return r.run(MSG_FACTORY.updateAllCommand()
@@ -221,6 +224,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
} else {
assertThat(request,
is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));
+ ReadWriteSingleRowReplicaRequest singleRowReplicaRequest =
(ReadWriteSingleRowReplicaRequest) request;
+
return r.run(MSG_FACTORY.updateCommand()
.tablePartitionId(
MSG_FACTORY.tablePartitionIdMessage()
@@ -229,7 +234,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
.build()
)
.rowUuid(UUID.randomUUID())
- .rowMessage(((ReadWriteSingleRowReplicaRequest)
request).binaryRowMessage())
+
.rowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(),
singleRowReplicaRequest))
.txId(TestTransactionIds.newTransactionId())
.txCoordinatorId(node.id())
.build());
@@ -252,6 +257,13 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
);
}
+ private static BinaryRowMessage binaryRowMessage(ByteBuffer tupleBuffer,
SchemaVersionAwareReplicaRequest request) {
+ return MSG_FACTORY.binaryRowMessage()
+ .schemaVersion(request.schemaVersion())
+ .binaryTuple(tupleBuffer)
+ .build();
+ }
+
@AfterAll
static void afterAllTests() throws Exception {
if (txManager != null) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
index be25a12970..8de1971b36 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowPkReplicaRequest.java
@@ -19,14 +19,14 @@ package
org.apache.ignite.internal.table.distributed.replication.request;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
/**
* Multiple row replica request involving table's Primary Keys.
*/
-public interface MultipleRowPkReplicaRequest extends ReplicaRequest {
+public interface MultipleRowPkReplicaRequest extends
SchemaVersionAwareReplicaRequest {
List<ByteBuffer> primaryKeys();
@Marshallable
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
index 43cbaf1188..6872aaac00 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
@@ -17,32 +17,33 @@
package org.apache.ignite.internal.table.distributed.replication.request;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
/**
* Multiple row replica request.
*/
-public interface MultipleRowReplicaRequest extends ReplicaRequest {
- List<BinaryRowMessage> binaryRowMessages();
+public interface MultipleRowReplicaRequest extends
SchemaVersionAwareReplicaRequest {
+ List<ByteBuffer> binaryTuples();
/**
- * Deserializes binary row byte buffers into binary rows.
+ * Returns {@link BinaryRow}s contained in the request.
*/
default List<BinaryRow> binaryRows() {
- List<BinaryRowMessage> binaryRowMessages = binaryRowMessages();
+ List<ByteBuffer> tuples = binaryTuples();
+ List<BinaryRow> rows = new ArrayList<>(tuples.size());
- var result = new ArrayList<BinaryRow>(binaryRowMessages.size());
-
- for (BinaryRowMessage message : binaryRowMessages) {
- result.add(message.asBinaryRow());
+ for (ByteBuffer tuple : tuples) {
+ rows.add(new BinaryRowImpl(schemaVersion(), tuple));
}
- return result;
+ return rows;
}
@Marshallable
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
index e60076022d..3cb6e76b4d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowPkReplicaRequest.java
@@ -18,14 +18,14 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import java.nio.ByteBuffer;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
/**
* Single row replica request involving a table's Primary Key.
*/
-public interface SingleRowPkReplicaRequest extends ReplicaRequest {
+public interface SingleRowPkReplicaRequest extends
SchemaVersionAwareReplicaRequest {
ByteBuffer primaryKey();
@Marshallable
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
index a2f033cf0e..a15fdf38d3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
@@ -17,19 +17,21 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import java.nio.ByteBuffer;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
/**
* Single-row replica request.
*/
-public interface SingleRowReplicaRequest extends ReplicaRequest {
- BinaryRowMessage binaryRowMessage();
+public interface SingleRowReplicaRequest extends
SchemaVersionAwareReplicaRequest {
+ ByteBuffer binaryTuple();
default BinaryRow binaryRow() {
- return binaryRowMessage().asBinaryRow();
+ return new BinaryRowImpl(schemaVersion(), binaryTuple());
}
@Marshallable
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
index 1b5a82f6ec..b9b6e1fd58 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
@@ -17,25 +17,27 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import java.nio.ByteBuffer;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
/**
* Dual row replica request.
*/
-public interface SwapRowReplicaRequest extends ReplicaRequest {
- BinaryRowMessage binaryRowMessage();
+public interface SwapRowReplicaRequest extends
SchemaVersionAwareReplicaRequest {
+ ByteBuffer newBinaryTuple();
- default BinaryRow binaryRow() {
- return binaryRowMessage().asBinaryRow();
+ default BinaryRow newBinaryRow() {
+ return new BinaryRowImpl(schemaVersion(), newBinaryTuple());
}
- BinaryRowMessage oldBinaryRowMessage();
+ ByteBuffer oldBinaryTuple();
default BinaryRow oldBinaryRow() {
- return oldBinaryRowMessage().asBinaryRow();
+ return new BinaryRowImpl(schemaVersion(), oldBinaryTuple());
}
@Marshallable
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index afb264d848..91e86650a2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -93,6 +93,7 @@ import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaReques
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -458,6 +459,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
private CompletableFuture<?> processRequest(ReplicaRequest request,
@Nullable Boolean isPrimary, String senderId) {
+ if (request instanceof SchemaVersionAwareReplicaRequest) {
+ assert ((SchemaVersionAwareReplicaRequest)
request).schemaVersion() > 0 : "No schema version passed?";
+ }
+
if (request instanceof CommittableTxRequest) {
var req = (CommittableTxRequest) request;
@@ -479,7 +484,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param request Request that's being processed.
*/
private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest
request) {
- // TODO: IGNITE-20106 - validate that input rows schema version
matches the tx-bound schema version.
+ // TODO: IGNITE-20715 - validate that input rows schema version
matches the tx-bound schema version.
HybridTimestamp tsToWaitForSchemas;
@@ -2021,10 +2026,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
Map<UUID, BinaryRowMessage> convertedMap =
rowsToInsert.entrySet().stream()
.collect(toMap(
e -> e.getKey().uuid(),
- e -> MSG_FACTORY.binaryRowMessage()
-
.binaryTuple(e.getValue().tupleSlice())
-
.schemaVersion(e.getValue().schemaVersion())
- .build()
+ e -> binaryRowMessage(e.getValue())
));
return allOf(insertLockFuts)
@@ -2075,14 +2077,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(rowIdFuts).thenCompose(ignore -> {
- List<BinaryRowMessage> searchRowMessages =
request.binaryRowMessages();
- Map<UUID, BinaryRowMessage> rowsToUpdate =
IgniteUtils.newHashMap(searchRowMessages.size());
+ Map<UUID, BinaryRowMessage> rowsToUpdate =
IgniteUtils.newHashMap(searchRows.size());
List<RowId> rows = new ArrayList<>();
- for (int i = 0; i < searchRowMessages.size(); i++) {
+ for (int i = 0; i < searchRows.size(); i++) {
RowId lockedRow = rowIdFuts[i].join().get1();
- rowsToUpdate.put(lockedRow.uuid(),
searchRowMessages.get(i));
+ rowsToUpdate.put(lockedRow.uuid(),
binaryRowMessage(searchRows.get(i)));
rows.add(lockedRow);
}
@@ -3024,7 +3025,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
ReadWriteSwapRowReplicaRequest request,
String txCoordinatorId
) {
- BinaryRow newRow = request.binaryRow();
+ BinaryRow newRow = request.newBinaryRow();
BinaryRow expectedRow = request.oldBinaryRow();
TablePartitionIdMessage commitPartitionId =
request.commitPartitionId();
@@ -3373,17 +3374,19 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
if (row != null) {
- BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
- .binaryTuple(row.tupleSlice())
- .schemaVersion(row.schemaVersion())
- .build();
-
- bldr.rowMessage(rowMessage);
+ bldr.rowMessage(binaryRowMessage(row));
}
return bldr.build();
}
+ private static BinaryRowMessage binaryRowMessage(BinaryRow row) {
+ return MSG_FACTORY.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+ }
+
private static UpdateAllCommand updateAllCommand(
Map<UUID, BinaryRowMessage> rowsToUpdate,
Map<UUID, HybridTimestamp> lastCommitTimes,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index cea79a364e..ab34b3265a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,7 @@ import static
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
@@ -42,10 +43,12 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -87,12 +90,13 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.MultipleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.MultipleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
import
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
@@ -306,7 +310,9 @@ public class InternalTableImpl implements InternalTable {
private <T> CompletableFuture<T> enlistInTx(
Collection<BinaryRowEx> keyRows,
@Nullable InternalTransaction tx,
- IgnitePentaFunction<Collection<BinaryRow>, InternalTransaction,
ReplicationGroupId, Long, Boolean, ReplicaRequest> fac,
+ IgnitePentaFunction<
+ Collection<? extends BinaryRow>, InternalTransaction,
ReplicationGroupId, Long, Boolean, ReplicaRequest
+ > fac,
Function<Collection<RowBatch>, CompletableFuture<T>> reducer,
BiPredicate<T, ReplicaRequest> noOpChecker
) {
@@ -727,6 +733,7 @@ public class InternalTableImpl implements InternalTable {
(groupId, consistencyToken) ->
tableMessagesFactory.readOnlyDirectSingleRowReplicaRequest()
.groupId(groupId)
.enlistmentConsistencyToken(consistencyToken)
+ .schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.requestType(RequestType.RO_GET)
.build()
@@ -743,6 +750,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
.groupId(groupId)
+ .schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.transactionId(txo.id())
@@ -766,6 +774,7 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode,
tableMessagesFactory.readOnlySingleRowPkReplicaRequest()
.groupId(partGroupId)
+ .schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.requestType(RequestType.RO_GET)
.readTimestampLong(readTimestamp.longValue())
@@ -808,7 +817,8 @@ public class InternalTableImpl implements InternalTable {
(groupId, consistencyToken) ->
tableMessagesFactory.readOnlyDirectMultiRowReplicaRequest()
.groupId(groupId)
.enlistmentConsistencyToken(consistencyToken)
- .primaryKeys(serializePrimaryKeys(keyRows))
+
.schemaVersion(keyRows.iterator().next().schemaVersion())
+ .primaryKeys(serializeBinaryTuples(keyRows))
.requestType(RequestType.RO_GET_ALL)
.build()
);
@@ -824,16 +834,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRows,
tx,
- (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
- .groupId(groupId)
- .primaryKeys(serializePrimaryKeys(keyRows0))
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .transactionId(txo.id())
- .term(term)
- .requestType(RW_GET_ALL)
- .timestampLong(clock.nowLong())
- .full(full)
- .build(),
+ (keyRows0, txo, groupId, term, full) -> {
+ return readWriteMultiRowPkReplicaRequest(RW_GET_ALL,
keyRows0, txo, groupId, term, full);
+ },
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
(res, req) -> false
);
@@ -853,7 +856,8 @@ public class InternalTableImpl implements InternalTable {
ReadOnlyMultiRowPkReplicaRequest request =
tableMessagesFactory.readOnlyMultiRowPkReplicaRequest()
.groupId(partGroupId)
-
.primaryKeys(serializePrimaryKeys(partitionRowBatch.getValue().requestedRows))
+
.schemaVersion(partitionRowBatch.getValue().requestedRows.get(0).schemaVersion())
+
.primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows))
.requestType(RequestType.RO_GET_ALL)
.readTimestampLong(readTimestamp.longValue())
.build();
@@ -864,24 +868,60 @@ public class InternalTableImpl implements InternalTable {
return
collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
}
- private List<BinaryRowMessage> serializeBinaryRows(Collection<? extends
BinaryRow> rows) {
- var result = new ArrayList<BinaryRowMessage>(rows.size());
+ private ReadWriteMultiRowPkReplicaRequest
readWriteMultiRowPkReplicaRequest(
+ RequestType requestType,
+ Collection<? extends BinaryRow> rows,
+ InternalTransaction tx,
+ ReplicationGroupId groupId,
+ Long term,
+ boolean full
+ ) {
+ assert allSchemaVersionsSame(rows) : "Different schema versions
encountered: " + uniqueSchemaVersions(rows);
+
+ return tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
+ .groupId(groupId)
+
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+ .schemaVersion(rows.iterator().next().schemaVersion())
+ .primaryKeys(serializeBinaryTuples(rows))
+ .transactionId(tx.id())
+ .term(term)
+ .requestType(requestType)
+ .timestampLong(clock.nowLong())
+ .full(full)
+ .build();
+ }
+
+ private static boolean allSchemaVersionsSame(Collection<? extends
BinaryRow> rows) {
+ int schemaVersion = -1;
+ boolean first = true;
for (BinaryRow row : rows) {
- result.add(serializeBinaryRow(row));
+ if (first) {
+ schemaVersion = row.schemaVersion();
+ first = false;
+
+ continue;
+ }
+
+ if (row.schemaVersion() != schemaVersion) {
+ return false;
+ }
}
- return result;
+ return true;
}
- private BinaryRowMessage serializeBinaryRow(BinaryRow row) {
- return tableMessagesFactory.binaryRowMessage()
- .binaryTuple(row.tupleSlice())
- .schemaVersion(row.schemaVersion())
- .build();
+ private static Set<Integer> uniqueSchemaVersions(Collection<? extends
BinaryRow> rows) {
+ Set<Integer> set = new HashSet<>();
+
+ for (BinaryRow row : rows) {
+ set.add(row.schemaVersion());
+ }
+
+ return set;
}
- private static List<ByteBuffer> serializePrimaryKeys(Collection<? extends
BinaryRow> keys) {
+ private static List<ByteBuffer> serializeBinaryTuples(Collection<? extends
BinaryRow> keys) {
var result = new ArrayList<ByteBuffer>(keys.size());
for (BinaryRow row : keys) {
@@ -907,7 +947,8 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessage(serializeBinaryRow(row))
+ .schemaVersion(row.schemaVersion())
+ .binaryTuple(row.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT)
@@ -957,7 +998,8 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessage(serializeBinaryRow(row))
+ .schemaVersion(row.schemaVersion())
+ .binaryTuple(row.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_UPSERT)
@@ -977,7 +1019,8 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessage(serializeBinaryRow(row))
+ .schemaVersion(row.schemaVersion())
+ .binaryTuple(row.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT)
@@ -994,16 +1037,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
- .groupId(groupId)
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessages(serializeBinaryRows(keyRows0))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_INSERT_ALL)
- .timestampLong(clock.nowLong())
- .full(full)
- .build(),
+ (keyRows0, txo, groupId, term, full) -> {
+ return
readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows0, txo,
groupId, term, full);
+ },
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1018,6 +1054,29 @@ public class InternalTableImpl implements InternalTable {
);
}
+ private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest(
+ RequestType requestType,
+ Collection<? extends BinaryRow> rows,
+ InternalTransaction tx,
+ ReplicationGroupId groupId,
+ Long term,
+ boolean full
+ ) {
+ assert allSchemaVersionsSame(rows) : "Different schema versions
encountered: " + uniqueSchemaVersions(rows);
+
+ return tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ .groupId(groupId)
+
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+ .schemaVersion(rows.iterator().next().schemaVersion())
+ .binaryTuples(serializeBinaryTuples(rows))
+ .transactionId(tx.id())
+ .term(term)
+ .requestType(requestType)
+ .timestampLong(clock.nowLong())
+ .full(full)
+ .build();
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx row,
InternalTransaction tx) {
@@ -1027,7 +1086,8 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessage(serializeBinaryRow(row))
+ .schemaVersion(row.schemaVersion())
+ .binaryTuple(row.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE_IF_EXIST)
@@ -1041,14 +1101,18 @@ public class InternalTableImpl implements InternalTable
{
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx
newRow, InternalTransaction tx) {
+ assert oldRow.schemaVersion() == newRow.schemaVersion()
+ : "Mismatching schema versions: old " + oldRow.schemaVersion()
+ ", new " + newRow.schemaVersion();
+
return enlistInTx(
newRow,
tx,
(txo, groupId, term) ->
tableMessagesFactory.readWriteSwapRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .oldBinaryRowMessage(serializeBinaryRow(oldRow))
- .binaryRowMessage(serializeBinaryRow(newRow))
+ .schemaVersion(oldRow.schemaVersion())
+ .oldBinaryTuple(oldRow.tupleSlice())
+ .newBinaryTuple(newRow.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE)
@@ -1068,7 +1132,8 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessage(serializeBinaryRow(row))
+ .schemaVersion(row.schemaVersion())
+ .binaryTuple(row.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_REPLACE)
@@ -1088,6 +1153,7 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+ .schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.transactionId(txo.id())
.term(term)
@@ -1108,7 +1174,8 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessage(serializeBinaryRow(oldRow))
+ .schemaVersion(oldRow.schemaVersion())
+ .binaryTuple(oldRow.tupleSlice())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT)
@@ -1128,6 +1195,7 @@ public class InternalTableImpl implements InternalTable {
(txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+ .schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
.transactionId(txo.id())
.term(term)
@@ -1145,16 +1213,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
- .groupId(groupId)
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .primaryKeys(serializePrimaryKeys(keyRows0))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_DELETE_ALL)
- .timestampLong(clock.nowLong())
- .full(full)
- .build(),
+ (keyRows0, txo, groupId, term, full) -> {
+ return readWriteMultiRowPkReplicaRequest(RW_DELETE_ALL,
keyRows0, txo, groupId, term, full);
+ },
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1178,16 +1239,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
- .groupId(groupId)
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessages(serializeBinaryRows(keyRows0))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_DELETE_EXACT_ALL)
- .timestampLong(clock.nowLong())
- .full(full)
- .build(),
+ (keyRows0, txo, groupId, term, full) -> {
+ return
readWriteMultiRowReplicaRequest(RequestType.RW_DELETE_EXACT_ALL, keyRows0, txo,
groupId, term, full);
+ },
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1978,16 +2032,7 @@ public class InternalTableImpl implements InternalTable {
) {
assert serializeTablePartitionId(txo.commitPartition()) != null;
- return tableMessagesFactory.readWriteMultiRowReplicaRequest()
- .groupId(groupId)
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .binaryRowMessages(serializeBinaryRows(keyRows0))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_UPSERT_ALL)
- .timestampLong(clock.nowLong())
- .full(full)
- .build();
+ return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL,
keyRows0, txo, groupId, term, full);
}
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index bcd5149663..41d46aa1b2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.table.distributed.replication;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.binaryRowsToBuffers;
import static
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
@@ -307,6 +308,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
.term(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
+ .schemaVersion(testPk.schemaVersion())
.primaryKey(testPk.tupleSlice())
.requestType(arg.type)
.build();
@@ -324,7 +326,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
.term(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
- .binaryRowMessage(binaryRowMessage(testBinaryRow))
+ .schemaVersion(testBinaryRow.schemaVersion())
+ .binaryTuple(testBinaryRow.tupleSlice())
.requestType(arg.type)
.build();
break;
@@ -387,6 +390,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
.term(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
+ .schemaVersion(pks.iterator().next().schemaVersion())
.primaryKeys(pks.stream().map(BinaryRow::tupleSlice).collect(toList()))
.requestType(arg.type)
.build();
@@ -401,7 +405,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
.term(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
-
.binaryRowMessages(rows.stream().map(PartitionReplicaListenerIndexLockingTest::binaryRowMessage).collect(toList()))
+ .schemaVersion(rows.iterator().next().schemaVersion())
+ .binaryTuples(binaryRowsToBuffers(rows))
.requestType(arg.type)
.build();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 029ae0cc5f..bb1c861bfd 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -587,34 +587,34 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
public void testReadOnlySingleRowReplicaRequestEmptyResult() throws
Exception {
BinaryRow testBinaryKey = nextBinaryKey();
- ByteBuffer pk = testBinaryKey.tupleSlice();
-
- CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(pk);
+ CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1,
TimeUnit.SECONDS).result();
assertNull(binaryRow);
}
- private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer
pk) {
+ private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk)
{
return doReadOnlySingleGet(pk, clock.now());
}
- private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer
pk, HybridTimestamp readTimestamp) {
+ private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk,
HybridTimestamp readTimestamp) {
ReadOnlySingleRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
.readTimestampLong(readTimestamp.longValue())
- .primaryKey(pk)
+ .schemaVersion(pk.schemaVersion())
+ .primaryKey(pk.tupleSlice())
.requestType(RequestType.RO_GET)
.build();
return partitionReplicaListener.invoke(request, localNode.id());
}
- private CompletableFuture<ReplicaResult>
doReadOnlyDirectSingleGet(ByteBuffer pk) {
+ private CompletableFuture<ReplicaResult>
doReadOnlyDirectSingleGet(BinaryRow pk) {
ReadOnlyDirectSingleRowReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
.groupId(grpId)
- .primaryKey(pk)
+ .schemaVersion(pk.schemaVersion())
+ .primaryKey(pk.tupleSlice())
.requestType(RequestType.RO_GET)
.enlistmentConsistencyToken(1L)
.build();
@@ -633,7 +633,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
testMvPartitionStorage.commitWrite(rowId, clock.now());
- CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+ CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1,
TimeUnit.SECONDS).result();
@@ -652,7 +652,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED,
localNode.id(), clock.now()));
- CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+ CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1,
TimeUnit.SECONDS).result();
@@ -670,7 +670,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING,
localNode.id(), null));
- CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+ CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1,
TimeUnit.SECONDS).result();
@@ -689,7 +689,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED,
localNode.id(), null));
- CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey.tupleSlice());
+ CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1,
TimeUnit.SECONDS).result();
@@ -1111,7 +1111,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
- .binaryRowMessage(binaryRowMessage(binaryRow))
+ .schemaVersion(binaryRow.schemaVersion())
+ .binaryTuple(binaryRow.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
.full(full)
@@ -1129,6 +1130,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
+ .schemaVersion(binaryRow.schemaVersion())
.primaryKey(binaryRow.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
@@ -1154,7 +1156,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
-
.binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
+
.schemaVersion(binaryRows.iterator().next().schemaVersion())
+ .binaryTuples(binaryRowsToBuffers(binaryRows))
.term(1L)
.commitPartitionId(commitPartitionId())
.full(full)
@@ -1163,6 +1166,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
);
}
+ static List<ByteBuffer> binaryRowsToBuffers(Collection<BinaryRow>
binaryRows) {
+ return
binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList());
+ }
+
private CompletableFuture<?> doMultiRowPkRequest(UUID txId,
Collection<BinaryRow> binaryRows, RequestType requestType) {
return doMultiRowPkRequest(txId, binaryRows, requestType, false);
}
@@ -1172,7 +1179,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
-
.primaryKeys(binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+
.schemaVersion(binaryRows.iterator().next().schemaVersion())
+ .primaryKeys(binaryRowsToBuffers(binaryRows))
.term(1L)
.commitPartitionId(commitPartitionId())
.full(full)
@@ -1195,7 +1203,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_INSERT)
- .binaryRowMessage(binaryRowMessage(binaryRow))
+ .schemaVersion(binaryRow.schemaVersion())
+ .binaryTuple(binaryRow.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
.build();
@@ -1222,7 +1231,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_UPSERT_ALL)
-
.binaryRowMessages(asList(binaryRowMessage(binaryRow0),
binaryRowMessage(binaryRow1)))
+ .schemaVersion(binaryRow0.schemaVersion())
+ .binaryTuples(asList(binaryRow0.tupleSlice(),
binaryRow1.tupleSlice()))
.term(1L)
.commitPartitionId(commitPartitionId())
.build();
@@ -1681,8 +1691,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(targetTxId)
.requestType(RequestType.RW_REPLACE)
- .oldBinaryRowMessage(binaryRowMessage(oldRow))
- .binaryRowMessage(binaryRowMessage(newRow))
+ .schemaVersion(oldRow.schemaVersion())
+ .oldBinaryTuple(oldRow.tupleSlice())
+ .newBinaryTuple(newRow.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
.full(full)
@@ -2149,9 +2160,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
) {
testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId,
readTimestamp, key) -> {
if (direct) {
- return doReadOnlyDirectSingleGet(marshalQuietly(key,
kvMarshaller).tupleSlice());
+ return doReadOnlyDirectSingleGet(marshalQuietly(key,
kvMarshaller));
} else {
- return doReadOnlySingleGet(marshalQuietly(key,
kvMarshaller).tupleSlice(), readTimestamp);
+ return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller),
readTimestamp);
}
});
}
@@ -2254,7 +2265,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RW_UPSERT)
.transactionId(txId)
- .binaryRowMessage(binaryRowMessage(row))
+ .schemaVersion(row.schemaVersion())
+ .binaryTuple(row.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
.build(),
@@ -2267,6 +2279,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RW_DELETE)
.transactionId(txId)
+ .schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
@@ -2280,6 +2293,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RO_GET)
.readTimestampLong(readTimestamp)
+ .schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
.build(),
localNode.id()
@@ -2301,7 +2315,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RO_GET_ALL)
.readTimestampLong(readTimestamp.longValue())
-
.primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+ .schemaVersion(rows.iterator().next().schemaVersion())
+ .primaryKeys(binaryRowsToBuffers(rows))
.build();
return partitionReplicaListener.invoke(request, localNode.id());
@@ -2311,7 +2326,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
ReadOnlyDirectMultiRowReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RO_GET_ALL)
-
.primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+ .schemaVersion(rows.iterator().next().schemaVersion())
+ .primaryKeys(binaryRowsToBuffers(rows))
.enlistmentConsistencyToken(1L)
.build();