This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 803b738636 IGNITE-19451 Prohibit some types from being Marshallable
(#2163)
803b738636 is described below
commit 803b7386360743f8be524f9554a81fc8980d3ea2
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Jun 8 15:57:04 2023 +0300
IGNITE-19451 Prohibit some types from being Marshallable (#2163)
---
.../internal/network/processor/TypeUtils.java | 21 ++++-
.../messages/MarshallableTypesBlackList.java | 37 +++++++-
.../processor/messages/MessageImplGenerator.java | 6 +-
.../src/main/resources/marshallable.blacklist | 4 +
.../processor/MarshallableBlacklistTest.java | 36 ++++++--
.../processor/TransferableObjectProcessorTest.java | 13 ---
.../src/test/resources/marshallable.blacklist | 1 +
.../message/LeaseGrantedMessage.java | 17 ++--
.../negotiation/LeaseNegotiator.java | 4 +-
.../replicator/PlacementDriverReplicaSideTest.java | 4 +-
.../sql/engine/schema/IgniteTableImpl.java | 17 +++-
.../ignite/distributed/ReplicaUnavailableTest.java | 10 +--
.../table/distributed/TableMessageGroup.java | 6 ++
.../replication/request/BinaryTupleMessage.java} | 28 ++++--
.../request/MultipleRowReplicaRequest.java | 21 ++++-
.../request/ReadOnlyReplicaRequest.java | 8 +-
.../request/ScanRetrieveBatchReplicaRequest.java | 12 +--
.../request/SingleRowReplicaRequest.java | 9 +-
.../replication/request/SwapRowReplicaRequest.java | 16 +++-
.../replicator/PartitionReplicaListener.java | 23 +++--
.../distributed/storage/InternalTableImpl.java | 96 +++++++++++++--------
.../PartitionReplicaListenerIndexLockingTest.java | 5 +-
.../replication/PartitionReplicaListenerTest.java | 99 ++++++++++++----------
23 files changed, 335 insertions(+), 158 deletions(-)
diff --git
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
index 5f3cf13735..e5371672d6 100644
---
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
+++
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
@@ -46,6 +46,14 @@ public class TypeUtils {
this.elements = processingEnvironment.getElementUtils();
}
+ public Types types() {
+ return types;
+ }
+
+ public Elements elements() {
+ return elements;
+ }
+
/**
* Returns {@code true} if the <i>erasure</i> of the given types are
actually the same type.
*
@@ -69,7 +77,18 @@ public class TypeUtils {
public boolean isSubType(TypeMirror type1, Class<?> type2) {
TypeMirror type2Mirror = typeMirrorFromClass(type2);
- return types.isSubtype(erasure(type1), erasure(type2Mirror));
+ return isSubType(type1, type2Mirror);
+ }
+
+ /**
+ * Returns {@code true} if the <i>erasure</i> of the first type is a
subtype of the second type.
+ *
+ * @param type1 first type (represented by a mirror)
+ * @param type2 second type (represented by a mirror)
+ * @return {@code true} if the erasure of the first type is a subtype of
the second type, {@code false} otherwise.
+ */
+ public boolean isSubType(TypeMirror type1, TypeMirror type2) {
+ return types.isSubtype(erasure(type1), erasure(type2));
}
/**
diff --git
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
index a976b1406b..698f1045a2 100644
---
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
+++
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
@@ -17,17 +17,25 @@
package org.apache.ignite.internal.network.processor.messages;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import javax.lang.model.element.TypeElement;
import javax.lang.model.type.ArrayType;
import javax.lang.model.type.DeclaredType;
import javax.lang.model.type.TypeMirror;
import javax.lang.model.util.SimpleTypeVisitor9;
+import org.apache.ignite.internal.network.processor.ProcessingException;
import org.apache.ignite.internal.network.processor.TypeUtils;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
@@ -38,6 +46,9 @@ import org.apache.ignite.network.annotations.Marshallable;
* {@code Marshallable} only if it is not supported by the Direct Marshaller
natively.
*/
public class MarshallableTypesBlackList {
+ /** Name of the file containing a newline-separated list of blacklisted
class names. */
+ private static final String BLACKLIST_FILE_NAME = "marshallable.blacklist";
+
/** Types supported by the Direct Marshaller. */
public static final List<Class<?>> NATIVE_TYPES = List.of(
// Primitive type wrappers
@@ -79,10 +90,28 @@ public class MarshallableTypesBlackList {
private static class TypeVisitor extends SimpleTypeVisitor9<Boolean, Void>
{
private final TypeUtils typeUtils;
+ private final List<TypeMirror> resourcesBlacklist;
+
TypeVisitor(TypeUtils typeUtils) {
super(false);
this.typeUtils = typeUtils;
+ this.resourcesBlacklist = readBlacklistFromResources();
+ }
+
+ private List<TypeMirror> readBlacklistFromResources() {
+ try (
+ InputStream is =
getClass().getClassLoader().getResourceAsStream(BLACKLIST_FILE_NAME);
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(is))
+ ) {
+ return reader.lines()
+ .map(typeUtils.elements()::getTypeElement)
+ .filter(Objects::nonNull)
+ .map(TypeElement::asType)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new ProcessingException("Unable to read " +
BLACKLIST_FILE_NAME, e);
+ }
}
@Override
@@ -97,11 +126,17 @@ public class MarshallableTypesBlackList {
return t.getTypeArguments().stream().anyMatch(this::visit);
}
- return !isSameType(NATIVE_TYPES, t) && !typeUtils.isSubType(t,
NetworkMessage.class);
+ return !isSameType(NATIVE_TYPES, t)
+ && !typeUtils.isSubType(t, NetworkMessage.class)
+ && !isSubType(resourcesBlacklist, t);
}
private boolean isSameType(List<Class<?>> types, DeclaredType type) {
return types.stream().anyMatch(cls -> typeUtils.isSameType(type,
cls));
}
+
+ private boolean isSubType(List<TypeMirror> types, DeclaredType type) {
+ return types.stream().anyMatch(cls -> typeUtils.isSubType(type,
cls));
+ }
}
}
diff --git
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index a7693f4e4b..091b9c3489 100644
---
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -122,12 +122,12 @@ public class MessageImplGenerator {
if (isMarshallable &&
!marshallableTypesBlackList.canBeMarshallable(getterType)) {
String error = String.format(
- "\"%s\" field is marked as @Marshallable but this type
is supported by native serialization, "
- + "remove this annotation from the field",
+ "\"%s\" field is marked as @Marshallable but this type
is either directly supported by native serialization "
+ + "or is prohibited by a blacklist, remove
this annotation from the field",
getterName
);
- throw new ProcessingException(error);
+ throw new ProcessingException(error, null, getter);
}
FieldSpec.Builder fieldBuilder =
FieldSpec.builder(getterReturnType, getterName)
diff --git
a/modules/network-annotation-processor/src/main/resources/marshallable.blacklist
b/modules/network-annotation-processor/src/main/resources/marshallable.blacklist
new file mode 100644
index 0000000000..93280f77dd
--- /dev/null
+++
b/modules/network-annotation-processor/src/main/resources/marshallable.blacklist
@@ -0,0 +1,4 @@
+org.apache.ignite.internal.schema.BinaryRow
+org.apache.ignite.internal.schema.BinaryTuple
+org.apache.ignite.internal.schema.BinaryTuplePrefix
+org.apache.ignite.internal.hlc.HybridTimestamp
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
index a2f0e992d1..3c6783f581 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
@@ -27,11 +27,14 @@ import com.google.testing.compile.Compiler;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.stream.Stream;
import javax.tools.JavaFileObject;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import
org.apache.ignite.internal.network.processor.messages.MarshallableTypesBlackList;
+import org.apache.ignite.network.NetworkMessage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -75,6 +78,7 @@ public class MarshallableBlacklistTest {
return List.of(
"package org.apache.ignite.internal.network.processor;",
+ "import
org.apache.ignite.internal.network.message.ScaleCubeMessage;",
"import org.apache.ignite.network.NetworkMessage;",
"import org.apache.ignite.network.annotations.Transferable;",
"import org.apache.ignite.network.annotations.Marshallable;",
@@ -102,7 +106,7 @@ public class MarshallableBlacklistTest {
Compilation compilation = compile(marshallableFieldSourceCode(type));
assertThat(compilation).hadErrorContaining(
- "\"foo\" field is marked as @Marshallable but this type is
supported by native serialization"
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
);
}
@@ -112,7 +116,7 @@ public class MarshallableBlacklistTest {
Compilation compilation = compile(marshallableArraySourceCode(type));
assertThat(compilation).hadErrorContaining(
- "\"foo\" field is marked as @Marshallable but this type is
supported by native serialization"
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
);
}
@@ -122,7 +126,7 @@ public class MarshallableBlacklistTest {
Compilation compilation =
compile(marshallableCollectionSourceCode(collectionType, type));
assertThat(compilation).hadErrorContaining(
- "\"foo\" field is marked as @Marshallable but this type is
supported by native serialization"
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
);
}
@@ -145,7 +149,7 @@ public class MarshallableBlacklistTest {
Compilation compilation =
compile(marshallableMapSourceCode(Integer.class, String.class));
assertThat(compilation).hadErrorContaining(
- "\"foo\" field is marked as @Marshallable but this type is
supported by native serialization"
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
);
}
@@ -154,7 +158,7 @@ public class MarshallableBlacklistTest {
Compilation compilation =
compile(marshallableNestedMapSourceCode(Integer.class, String.class));
assertThat(compilation).hadErrorContaining(
- "\"foo\" field is marked as @Marshallable but this type is
supported by native serialization"
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
);
}
@@ -165,6 +169,28 @@ public class MarshallableBlacklistTest {
assertThat(compilation).succeededWithoutWarnings();
}
+ /**
+ * Tests that compilation fails if message's field is both {@link
NetworkMessage} and marked
+ * as {@link org.apache.ignite.network.annotations.Marshallable}.
+ */
+ @Test
+ void testMessageWithMarshallableMessage() {
+ Compilation compilation =
compile(marshallableFieldSourceCode(ScaleCubeMessage.class));
+
+ assertThat(compilation).hadErrorContaining(
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
+ );
+ }
+
+ @Test
+ void testMessageWithBlockedByFileType() {
+ Compilation compilation =
compile(marshallableFieldSourceCode(Random.class));
+
+ assertThat(compilation).hadErrorContaining(
+ "\"foo\" field is marked as @Marshallable but this type is
either directly supported by native serialization"
+ );
+ }
+
private Compilation compile(List<String> code) {
return compiler.compile(
forSourceLines("org.apache.ignite.internal.network.processor.TestMessage",
code),
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
index 56c58442dc..ec119c6ab5 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
@@ -249,19 +249,6 @@ public class TransferableObjectProcessorTest {
);
}
- /**
- * Tests that compilation fails if message's field is both {@link
NetworkMessage} and marked
- * as {@link org.apache.ignite.network.annotations.Marshallable}.
- */
- @Test
- void testFieldBothNetworkMessageAndMarkedMarshallable() {
- Compilation compilation =
compile("MessageWithMarshallableNetworkMessageField");
-
- assertThat(compilation).hadErrorContaining(
- "\"msgField\" field is marked as @Marshallable but this type
is supported by native serialization"
- );
- }
-
/**
* Compiles the given network message.
*/
diff --git a/modules/network/src/test/resources/marshallable.blacklist
b/modules/network/src/test/resources/marshallable.blacklist
new file mode 100644
index 0000000000..c33559224b
--- /dev/null
+++ b/modules/network/src/test/resources/marshallable.blacklist
@@ -0,0 +1 @@
+java.util.Random
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
index ef009b0001..87ab398299 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
@@ -17,8 +17,9 @@
package org.apache.ignite.internal.placementdriver.message;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,11 +27,17 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE)
public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage {
- @Marshallable
- HybridTimestamp leaseStartTime();
+ long leaseStartTimeLong();
+
+ default HybridTimestamp leaseStartTime() {
+ return hybridTimestamp(leaseStartTimeLong());
+ }
+
+ long leaseExpirationTimeLong();
- @Marshallable
- HybridTimestamp leaseExpirationTime();
+ default HybridTimestamp leaseExpirationTime() {
+ return hybridTimestamp(leaseExpirationTimeLong());
+ }
boolean force();
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index 70fdd0e3d7..2d87ab0eae 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -76,8 +76,8 @@ public class LeaseNegotiator {
lease.getLeaseholder(),
PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
.groupId(groupId)
- .leaseStartTime(lease.getStartTime())
- .leaseExpirationTime(lease.getExpirationTime())
+
.leaseStartTimeLong(lease.getStartTime().longValue())
+
.leaseExpirationTimeLong(lease.getExpirationTime().longValue())
.force(force)
.build(),
leaseInterval)
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index 5fcc8602ff..d5ffb24812 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -145,8 +145,8 @@ public class PlacementDriverReplicaSideTest {
) {
PlacementDriverReplicaMessage msg = MSG_FACTORY.leaseGrantedMessage()
.groupId(GRP_ID)
- .leaseStartTime(leaseStartTime)
- .leaseExpirationTime(leaseExpirationTime)
+ .leaseStartTimeLong(leaseStartTime.longValue())
+ .leaseExpirationTimeLong(leaseExpirationTime.longValue())
.force(force)
.build();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 92f1300ce2..89617b514c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -356,7 +357,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(commitPartitionId)
- .binaryRows(partToRows.getValue())
+
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_UPSERT_ALL)
@@ -369,6 +370,16 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
return CompletableFuture.allOf(futures);
}
+ private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow>
rows) {
+ var result = new ArrayList<ByteBuffer>(rows.size());
+
+ for (BinaryRow row : rows) {
+ result.add(row.byteBuffer());
+ }
+
+ return result;
+ }
+
/** {@inheritDoc} */
@Override
public <RowT> CompletableFuture<?> insertAll(
@@ -401,7 +412,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(commitPartitionId)
- .binaryRows(partToRows.getValue())
+
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_INSERT_ALL)
@@ -464,7 +475,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(commitPartitionId)
- .binaryRows(partToRows.getValue())
+
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_DELETE_ALL)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 6c85625e8e..f92dc3e9be 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -47,7 +48,6 @@ import
org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -139,7 +139,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRow(createKeyValueRow(1L, 1L))
+ .binaryRowBytes(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -181,7 +181,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRow(createKeyValueRow(1L, 1L))
+ .binaryRowBytes(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -207,12 +207,12 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
assertTrue(e1.getCause() instanceof ReplicaUnavailableException,
e1.toString());
}
- private static Row createKeyValueRow(long id, long value) {
+ private static ByteBuffer createKeyValueRow(long id, long value) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA);
rowBuilder.appendLong(id);
rowBuilder.appendLong(value);
- return new Row(SCHEMA, rowBuilder.build());
+ return rowBuilder.build().byteBuffer();
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 88373fd5e5..4ccb9ab6e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
@@ -137,6 +138,11 @@ public interface TableMessageGroup {
*/
short SNAPSHOT_TX_DATA_RESPONSE = 16;
+ /**
+ * Message type for {@link BinaryTupleMessage}.
+ */
+ short BINARY_TUPLE = 17;
+
/**
* Message types for Table module RAFT commands.
*/
diff --git
a/modules/network/src/test/resources/org/apache/ignite/internal/network/processor/MessageWithMarshallableNetworkMessageField.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryTupleMessage.java
similarity index 54%
rename from
modules/network/src/test/resources/org/apache/ignite/internal/network/processor/MessageWithMarshallableNetworkMessageField.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryTupleMessage.java
index 07f49676b0..a9856ccb09 100644
---
a/modules/network/src/test/resources/org/apache/ignite/internal/network/processor/MessageWithMarshallableNetworkMessageField.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryTupleMessage.java
@@ -15,15 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.processor;
+package org.apache.ignite.internal.table.distributed.replication.request;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
-@Transferable(1)
-public interface MessageWithMarshallableNetworkMessageField extends
NetworkMessage {
- @Marshallable
- ScaleCubeMessage msgField();
+/**
+ * Message for transferring a {@link BinaryTuple} or a {@link
BinaryTuplePrefix}.
+ */
+@Transferable(TableMessageGroup.BINARY_TUPLE)
+public interface BinaryTupleMessage extends NetworkMessage {
+ ByteBuffer tuple();
+
+ int elementCount();
+
+ default BinaryTuple asBinaryTuple() {
+ return new BinaryTuple(elementCount(), tuple());
+ }
+
+ default BinaryTuplePrefix asBinaryTuplePrefix() {
+ return new BinaryTuplePrefix(elementCount(), tuple());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
index 85a3e3724f..5b7c39b720 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.table.distributed.replication.request;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
@@ -27,8 +30,22 @@ import org.apache.ignite.network.annotations.Marshallable;
* Multiple row replica request.
*/
public interface MultipleRowReplicaRequest extends ReplicaRequest {
- @Marshallable
- Collection<BinaryRow> binaryRows();
+ Collection<ByteBuffer> binaryRowsBytes();
+
+ /**
+ * Deserializes binary row byte buffers into binary rows.
+ */
+ default Collection<BinaryRow> binaryRows() {
+ Collection<ByteBuffer> binaryRowsBytes = binaryRowsBytes();
+
+ var result = new ArrayList<BinaryRow>(binaryRowsBytes.size());
+
+ for (ByteBuffer buffer : binaryRowsBytes) {
+ result.add(new ByteBufferRow(buffer));
+ }
+
+ return result;
+ }
@Marshallable
RequestType requestType();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
index 5a61b2e23e..d30e6fe243 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
@@ -19,12 +19,14 @@ package
org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.network.annotations.Marshallable;
/**
* Read only replica request.
*/
public interface ReadOnlyReplicaRequest extends ReplicaRequest {
- @Marshallable
- HybridTimestamp readTimestamp();
+ long readTimestampLong();
+
+ default HybridTimestamp readTimestamp() {
+ return HybridTimestamp.hybridTimestamp(readTimestampLong());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
index a0a8ebe030..32876b4d4c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
@@ -19,10 +19,7 @@ package
org.apache.ignite.internal.table.distributed.replication.request;
import java.util.BitSet;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
-import org.apache.ignite.network.annotations.Marshallable;
import org.jetbrains.annotations.Nullable;
/**
@@ -46,8 +43,7 @@ public interface ScanRetrieveBatchReplicaRequest extends
ReplicaRequest {
* @return Key to search.
*/
@Nullable
- @Marshallable
- BinaryTuple exactKey();
+ BinaryTupleMessage exactKey();
/**
* Gets a lower bound to choose entries from {@link SortedIndexStorage}.
Exclusivity is controlled by a {@link
@@ -56,8 +52,7 @@ public interface ScanRetrieveBatchReplicaRequest extends
ReplicaRequest {
* @return lower bound.
*/
@Nullable
- @Marshallable
- BinaryTuplePrefix lowerBound();
+ BinaryTupleMessage lowerBoundPrefix();
/**
* Gets an upper bound to choose entries from {@link SortedIndexStorage}.
Upper bound. Exclusivity is controlled by a {@link
@@ -66,8 +61,7 @@ public interface ScanRetrieveBatchReplicaRequest extends
ReplicaRequest {
* @return upper bound.
*/
@Nullable
- @Marshallable
- BinaryTuplePrefix upperBound();
+ BinaryTupleMessage upperBoundPrefix();
/**
* Gets control flags for {@link SortedIndexStorage}. {@link
SortedIndexStorage#GREATER} | {@link SortedIndexStorage#LESS} by default.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
index 4fef3610a1..98a4656483 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.table.distributed.replication.request;
+import java.nio.ByteBuffer;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
@@ -26,8 +28,11 @@ import org.apache.ignite.network.annotations.Marshallable;
* Single-row replica request.
*/
public interface SingleRowReplicaRequest extends ReplicaRequest {
- @Marshallable
- BinaryRow binaryRow();
+ ByteBuffer binaryRowBytes();
+
+ default BinaryRow binaryRow() {
+ return new ByteBufferRow(binaryRowBytes());
+ }
@Marshallable
RequestType requestType();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
index 7067dc2f35..d088c702d3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.table.distributed.replication.request;
+import java.nio.ByteBuffer;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
@@ -26,11 +28,17 @@ import org.apache.ignite.network.annotations.Marshallable;
* Dual row replica request.
*/
public interface SwapRowReplicaRequest extends ReplicaRequest {
- @Marshallable
- BinaryRow binaryRow();
+ ByteBuffer binaryRowBytes();
- @Marshallable
- BinaryRow oldBinaryRow();
+ default BinaryRow binaryRow() {
+ return new ByteBufferRow(binaryRowBytes());
+ }
+
+ ByteBuffer oldBinaryRowBytes();
+
+ default BinaryRow oldBinaryRow() {
+ return new ByteBufferRow(oldBinaryRowBytes());
+ }
@Marshallable
RequestType requestType();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b5533cccfd..1697b4725b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -108,6 +108,7 @@ import
org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -434,7 +435,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
if (request.exactKey() != null) {
- assert request.lowerBound() == null && request.upperBound() ==
null : "Index lookup doesn't allow bounds.";
+ assert request.lowerBoundPrefix() == null &&
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
return safeReadFuture.thenCompose(unused ->
lookupIndex(request, indexStorage));
}
@@ -668,7 +669,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
if (request.exactKey() != null) {
- assert request.lowerBound() == null && request.upperBound() ==
null : "Index lookup doesn't allow bounds.";
+ assert request.lowerBoundPrefix() == null &&
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
return lookupIndex(request, indexStorage.storage());
}
@@ -731,7 +732,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(request.transactionId(),
request.scanId());
- BinaryTuple key = request.exactKey();
+ BinaryTuple key = request.exactKey().asBinaryTuple();
Cursor<RowId> cursor = (Cursor<RowId>)
cursors.computeIfAbsent(cursorId,
id -> indexStorage.get(key));
@@ -755,7 +756,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
Integer indexId = request.indexToUse();
- BinaryTuple exactKey = request.exactKey();
+ BinaryTuple exactKey = request.exactKey().asBinaryTuple();
return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
@@ -792,8 +793,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
Integer indexId = request.indexToUse();
- BinaryTuplePrefix lowerBound = request.lowerBound();
- BinaryTuplePrefix upperBound = request.upperBound();
+ BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
+ BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
+
+ BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null :
lowerBoundMessage.asBinaryTuplePrefix();
+ BinaryTuplePrefix upperBound = upperBoundMessage == null ? null :
upperBoundMessage.asBinaryTuplePrefix();
int flags = request.flags();
@@ -859,8 +863,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
- BinaryTuplePrefix lowerBound = request.lowerBound();
- BinaryTuplePrefix upperBound = request.upperBound();
+ BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
+ BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
+
+ BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null :
lowerBoundMessage.asBinaryTuplePrefix();
+ BinaryTuplePrefix upperBound = upperBoundMessage == null ? null :
upperBoundMessage.asBinaryTuplePrefix();
int flags = request.flags();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 2c1720379c..ef2914eb6c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -31,6 +31,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.net.ConnectException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -52,6 +53,7 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.Peer;
@@ -70,6 +72,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
@@ -383,9 +386,9 @@ public class InternalTableImpl implements InternalTable {
.transactionId(tx.id())
.scanId(scanId)
.indexToUse(indexId)
- .exactKey(exactKey)
- .lowerBound(lowerBound)
- .upperBound(upperBound)
+ .exactKey(binaryTupleMessage(exactKey))
+ .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+ .upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
.columnsToInclude(columnsToInclude)
.batchSize(batchSize);
@@ -407,6 +410,17 @@ public class InternalTableImpl implements InternalTable {
return postEnlist(fut, false, tx);
}
+ private @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable
BinaryTupleReader binaryTuple) {
+ if (binaryTuple == null) {
+ return null;
+ }
+
+ return tableMessagesFactory.binaryTupleMessage()
+ .tuple(binaryTuple.byteBuffer())
+ .elementCount(binaryTuple.elementCount())
+ .build();
+ }
+
/**
* Partition enlisting with retrying.
*
@@ -513,7 +527,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .binaryRow(keyRow)
+ .binaryRowBytes(keyRow.byteBuffer())
.commitPartitionId(commitPart)
.transactionId(txo.id())
.term(term)
@@ -535,9 +549,9 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode,
tableMessagesFactory.readOnlySingleRowReplicaRequest()
.groupId(partGroupId)
- .binaryRow(keyRow)
+ .binaryRowBytes(keyRow.byteBuffer())
.requestType(RequestType.RO_GET)
- .readTimestamp(readTimestamp)
+ .readTimestampLong(readTimestamp.longValue())
.build()
);
}
@@ -560,7 +574,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
- .binaryRows(keyRows0)
+ .binaryRowsBytes(serializeBinaryRows(keyRows0))
.commitPartitionId(commitPart)
.transactionId(txo.id())
.term(term)
@@ -589,9 +603,9 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode,
tableMessagesFactory.readOnlyMultiRowReplicaRequest()
.groupId(partGroupId)
- .binaryRows(partToRows.getValue())
+
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
.requestType(RequestType.RO_GET_ALL)
- .readTimestamp(readTimestamp)
+ .readTimestampLong(readTimestamp.longValue())
.build()
);
@@ -601,6 +615,16 @@ public class InternalTableImpl implements InternalTable {
return collectMultiRowsResponses(futures);
}
+ private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow>
rows) {
+ var result = new ArrayList<ByteBuffer>(rows.size());
+
+ for (BinaryRow row : rows) {
+ result.add(row.byteBuffer());
+ }
+
+ return result;
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction
tx) {
@@ -610,7 +634,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT)
@@ -627,7 +651,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRows(keyRows0)
+ .binaryRowsBytes(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT_ALL)
@@ -645,7 +669,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_UPSERT)
@@ -663,7 +687,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT)
@@ -681,7 +705,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRows(keyRows0)
+ .binaryRowsBytes(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT_ALL)
@@ -699,7 +723,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE_IF_EXIST)
@@ -717,8 +741,8 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSwapRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .oldBinaryRow(oldRow)
- .binaryRow(newRow)
+ .oldBinaryRowBytes(oldRow.byteBuffer())
+ .binaryRowBytes(newRow.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE)
@@ -736,7 +760,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_REPLACE)
@@ -754,7 +778,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(keyRow)
+ .binaryRowBytes(keyRow.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE)
@@ -772,7 +796,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(oldRow)
+ .binaryRowBytes(oldRow.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT)
@@ -790,7 +814,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_DELETE)
@@ -808,7 +832,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRows(keyRows0)
+ .binaryRowsBytes(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_ALL)
@@ -829,7 +853,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRows(keyRows0)
+ .binaryRowsBytes(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT_ALL)
@@ -897,15 +921,15 @@ public class InternalTableImpl implements InternalTable {
ReadOnlyScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
- .readTimestamp(readTimestamp)
+ .readTimestampLong(readTimestamp.longValue())
// TODO: IGNITE-17666 Close cursor tx finish.
.transactionId(txId)
.scanId(scanId)
.batchSize(batchSize)
.indexToUse(indexId)
- .exactKey(exactKey)
- .lowerBound(lowerBound)
- .upperBound(upperBound)
+ .exactKey(binaryTupleMessage(exactKey))
+ .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+ .upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
.columnsToInclude(columnsToInclude)
.build();
@@ -1010,9 +1034,9 @@ public class InternalTableImpl implements InternalTable {
.transactionId(txId)
.scanId(scanId)
.indexToUse(indexId)
- .exactKey(exactKey)
- .lowerBound(lowerBound)
- .upperBound(upperBound)
+ .exactKey(binaryTupleMessage(exactKey))
+ .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+ .upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
.columnsToInclude(columnsToInclude)
.batchSize(batchSize)
@@ -1224,7 +1248,7 @@ public class InternalTableImpl implements InternalTable {
* Enlists a partition.
*
* @param partId Partition id.
- * @param tx The transaction.
+ * @param tx The transaction.
* @return The enlist future (then will a leader become known).
*/
protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int
partId, InternalTransaction tx) {
@@ -1268,7 +1292,8 @@ public class InternalTableImpl implements InternalTable {
* The constructor.
*
* @param retrieveBatch Closure that gets a new batch from the remote
replica.
- * @param onClose The closure will be applied when {@link
Subscription#cancel} is invoked directly or the cursor is finished.
+ * @param onClose The closure will be applied when {@link
Subscription#cancel} is invoked directly or the cursor is
+ * finished.
*/
PartitionScanPublisher(
BiFunction<Long, Integer,
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
@@ -1429,6 +1454,7 @@ public class InternalTableImpl implements InternalTable {
}
// TODO: IGNITE-17963 Use smarter logic for recipient node evaluation.
+
/**
* Evaluated cluster node for read-only request processing.
*
@@ -1452,9 +1478,9 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * Casts any exception type to a client exception, wherein {@link
ReplicationException} and {@link LockException} are wrapped
- * to {@link TransactionException}, but another exceptions are wrapped to
a common exception.
- * The method does not wrap an exception if the exception already inherits
type of {@link RuntimeException}.
+ * Casts any exception type to a client exception, wherein {@link
ReplicationException} and {@link LockException} are wrapped to
+ * {@link TransactionException}, but another exceptions are wrapped to a
common exception. The method does not wrap an exception if the
+ * exception already inherits type of {@link RuntimeException}.
*
* @param e An instance exception to cast to client side one.
* @return {@link IgniteException} An instance of client side exception.
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d679938837..ff830f9d9b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.replication;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@@ -252,7 +253,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
.term(1L)
.commitPartitionId(PARTITION_ID)
.transactionId(TRANSACTION_ID)
- .binaryRow(testBinaryRow)
+ .binaryRowBytes(testBinaryRow.byteBuffer())
.requestType(arg.type)
.build());
@@ -298,7 +299,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
.term(1L)
.commitPartitionId(PARTITION_ID)
.transactionId(TRANSACTION_ID)
- .binaryRows(rows)
+
.binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
.requestType(arg.type)
.build());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 44ae746ff6..cf033a7a8d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.replication;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -79,7 +80,6 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -116,6 +116,7 @@ import
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
@@ -512,8 +513,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
- .readTimestamp(clock.now())
- .binaryRow(testBinaryKey)
+ .readTimestampLong(clock.nowLong())
+ .binaryRowBytes(testBinaryKey.byteBuffer())
.requestType(RequestType.RO_GET)
.build());
@@ -535,8 +536,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
- .readTimestamp(clock.now())
- .binaryRow(testBinaryKey)
+ .readTimestampLong(clock.nowLong())
+ .binaryRowBytes(testBinaryKey.byteBuffer())
.requestType(RequestType.RO_GET)
.build());
@@ -558,8 +559,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
- .readTimestamp(clock.now())
- .binaryRow(testBinaryKey)
+ .readTimestampLong(clock.nowLong())
+ .binaryRowBytes(testBinaryKey.byteBuffer())
.requestType(RequestType.RO_GET)
.build());
@@ -580,8 +581,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
- .readTimestamp(clock.now())
- .binaryRow(testBinaryKey)
+ .readTimestampLong(clock.nowLong())
+ .binaryRowBytes(testBinaryKey.byteBuffer())
.requestType(RequestType.RO_GET)
.build());
@@ -603,8 +604,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
- .readTimestamp(clock.now())
- .binaryRow(testBinaryKey)
+ .readTimestampLong(clock.nowLong())
+ .binaryRowBytes(testBinaryKey.byteBuffer())
.requestType(RequestType.RO_GET)
.build());
@@ -674,8 +675,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.term(1L)
.scanId(2L)
.indexToUse(sortedIndexId)
- .lowerBound(toIndexBound(1))
- .upperBound(toIndexBound(3))
+ .lowerBoundPrefix(toIndexBound(1))
+ .upperBoundPrefix(toIndexBound(3))
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
.build());
@@ -693,7 +694,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.term(1L)
.scanId(2L)
.indexToUse(sortedIndexId)
- .lowerBound(toIndexBound(5))
+ .lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
.build());
@@ -745,7 +746,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
@@ -760,7 +761,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
@@ -775,11 +776,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(TestTransactionIds.newTransactionId())
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
- .lowerBound(toIndexBound(1))
- .upperBound(toIndexBound(3))
+ .lowerBoundPrefix(toIndexBound(1))
+ .upperBoundPrefix(toIndexBound(3))
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
.build());
@@ -793,10 +794,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(TestTransactionIds.newTransactionId())
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
- .lowerBound(toIndexBound(5))
+ .lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
.build());
@@ -809,7 +810,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(TestTransactionIds.newTransactionId())
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
.exactKey(toIndexKey(0))
@@ -847,7 +848,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(0))
@@ -863,7 +864,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(0))
@@ -879,7 +880,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(TestTransactionIds.newTransactionId())
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(5))
@@ -895,7 +896,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(TestTransactionIds.newTransactionId())
- .readTimestamp(clock.now())
+ .readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(1))
@@ -985,7 +986,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
- .binaryRow(binaryRow)
+ .binaryRowBytes(binaryRow.byteBuffer())
.term(1L)
.commitPartitionId(commitPartitionId())
.build()
@@ -1001,7 +1002,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
- .binaryRows(binaryRows)
+
.binaryRowsBytes(binaryRows.stream().map(BinaryRow::byteBuffer).collect(toList()))
.term(1L)
.commitPartitionId(commitPartitionId())
.build()
@@ -1022,7 +1023,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_INSERT)
- .binaryRow(binaryRow)
+ .binaryRowBytes(binaryRow.byteBuffer())
.term(1L)
.commitPartitionId(commitPartitionId())
.build();
@@ -1049,7 +1050,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_UPSERT_ALL)
- .binaryRows(asList(binaryRow0, binaryRow1))
+ .binaryRowsBytes(asList(binaryRow0.byteBuffer(),
binaryRow1.byteBuffer()))
.term(1L)
.commitPartitionId(commitPartitionId())
.build();
@@ -1213,7 +1214,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
Set<BinaryRow> expected = committed
? (upsertAfterDelete ? allRows : allRowsButModified)
: (insertFirst ? allRows : Set.of());
- Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, clock.now()));
+ Set<BinaryRow> res = new HashSet<>(roGetAll(allRows,
clock.nowLong()));
assertEquals(expected.size(), res.size());
for (BinaryRow e : expected) {
@@ -1221,7 +1222,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
res.contains(e);
}
} else {
- BinaryRow res = roGet(br1, clock.now());
+ BinaryRow res = roGet(br1, clock.nowLong());
BinaryRow expected = committed
? (upsertAfterDelete ? br1 : null)
: (insertFirst ? br1 : null);
@@ -1465,8 +1466,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.transactionId(targetTxId)
.requestType(RequestType.RW_REPLACE)
- .oldBinaryRow(binaryRow(key, new TestValue(1, "v1"),
kvMarshaller))
- .binaryRow(binaryRow(key, new TestValue(3, "v3"),
kvMarshaller))
+ .oldBinaryRowBytes(binaryRow(key, new TestValue(1,
"v1"), kvMarshaller).byteBuffer())
+ .binaryRowBytes(binaryRow(key, new TestValue(3, "v3"),
kvMarshaller).byteBuffer())
.term(1L)
.commitPartitionId(commitPartitionId())
.build()
@@ -1528,7 +1529,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RW_UPSERT)
.transactionId(txId)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.term(1L)
.commitPartitionId(new TablePartitionId(tblId, partId))
.build()
@@ -1540,31 +1541,31 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RW_DELETE)
.transactionId(txId)
- .binaryRow(row)
+ .binaryRowBytes(row.byteBuffer())
.term(1L)
.commitPartitionId(new TablePartitionId(tblId, partId))
.build()
).join();
}
- private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) {
+ private BinaryRow roGet(BinaryRow row, long readTimestamp) {
CompletableFuture<?> future =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RO_GET)
- .readTimestamp(readTimestamp)
- .binaryRow(row)
+ .readTimestampLong(readTimestamp)
+ .binaryRowBytes(row.byteBuffer())
.build()
);
return (BinaryRow) future.join();
}
- private List<BinaryRow> roGetAll(Collection<BinaryRow> rows,
HybridTimestamp readTimestamp) {
+ private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, long
readTimestamp) {
CompletableFuture<?> future =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RO_GET_ALL)
- .readTimestamp(readTimestamp)
- .binaryRows(rows)
+ .readTimestampLong(readTimestamp)
+
.binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
.build()
);
@@ -1584,16 +1585,22 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txState = TxState.COMMITED;
}
- private BinaryTuplePrefix toIndexBound(int val) {
+ private BinaryTupleMessage toIndexBound(int val) {
ByteBuffer tuple = new BinaryTuplePrefixBuilder(1,
1).appendInt(val).build();
- return new BinaryTuplePrefix(1, tuple);
+ return TABLE_MESSAGES_FACTORY.binaryTupleMessage()
+ .tuple(tuple)
+ .elementCount(1)
+ .build();
}
- private BinaryTuple toIndexKey(int val) {
+ private BinaryTupleMessage toIndexKey(int val) {
ByteBuffer tuple = new BinaryTupleBuilder(1,
true).appendInt(val).build();
- return new BinaryTuple(1, tuple);
+ return TABLE_MESSAGES_FACTORY.binaryTupleMessage()
+ .tuple(tuple)
+ .elementCount(1)
+ .build();
}
private BinaryRow nextBinaryKey() {