This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 951ae594ed IGNITE-17739 Add Partition Awareness to all table APIs in
Java client (#1119)
951ae594ed is described below
commit 951ae594eda4d5403ac18576aa44b168e56e4a19
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Sep 26 12:29:12 2022 +0300
IGNITE-17739 Add Partition Awareness to all table APIs in Java client
(#1119)
* Move common hash calculation logic to `ClientTupleSerializer`.
* Add partition awareness to all APIs in `ClientRecordView`,
`ClientRecordBinaryView`, `ClientKeyValueView`, `ClientKeyValueBinaryView`.
---
.../client/table/ClientKeyValueBinaryView.java | 64 ++++--
.../internal/client/table/ClientKeyValueView.java | 46 ++--
.../client/table/ClientRecordBinaryView.java | 86 +++++---
.../internal/client/table/ClientRecordView.java | 69 ++++--
.../ignite/internal/client/table/ClientTable.java | 51 ++++-
.../client/table/ClientTupleSerializer.java | 40 ++++
.../ignite/client/PartitionAwarenessTest.java | 235 ++++++++++++++++++++-
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
.../ignite/client/fakes/FakeInternalTable.java | 62 +++---
9 files changed, 526 insertions(+), 129 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
index a589762a39..edf5be97ca 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
@@ -75,7 +75,9 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET,
(s, w) -> ser.writeTuple(tx, key, s, w, true),
- ClientTupleSerializer::readValueTuple);
+ ClientTupleSerializer::readValueTuple,
+ null,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -89,11 +91,16 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable
Transaction tx, @NotNull Collection<Tuple> keys) {
Objects.requireNonNull(keys);
+ if (keys.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_ALL,
(s, w) -> ser.writeTuples(tx, keys, s, w, true),
ClientTupleSerializer::readKvTuplesNullable,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ ClientTupleSerializer.getHashFunction(tx,
keys.iterator().next()));
}
/**
@@ -130,7 +137,9 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET,
(s, w) -> ser.writeTuple(tx, key, s, w, true),
- (s, r) ->
IgniteUtils.nonNullOrElse(ClientTupleSerializer.readValueTuple(s, r),
defaultValue));
+ (s, r) ->
IgniteUtils.nonNullOrElse(ClientTupleSerializer.readValueTuple(s, r),
defaultValue),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -147,7 +156,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_CONTAINS_KEY,
(s, w) -> ser.writeTuple(tx, key, s, w, true),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -164,7 +174,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT,
(s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -178,15 +189,20 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
public @NotNull CompletableFuture<Void> putAllAsync(@Nullable Transaction
tx, @NotNull Map<Tuple, Tuple> pairs) {
Objects.requireNonNull(pairs);
+ if (pairs.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT_ALL,
(s, w) -> ser.writeKvTuples(tx, pairs, s, w),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx,
pairs.keySet().iterator().next()));
}
/** {@inheritDoc} */
@Override
- public Tuple getAndPut(@Nullable Transaction tx, @NotNull Tuple key, Tuple
val) {
+ public Tuple getAndPut(@Nullable Transaction tx, @NotNull Tuple key,
@NotNull Tuple val) {
return sync(getAndPutAsync(tx, key, val));
}
@@ -199,7 +215,9 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_UPSERT,
(s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
- ClientTupleSerializer::readValueTuple);
+ ClientTupleSerializer::readValueTuple,
+ null,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/**
@@ -237,7 +255,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_INSERT,
(s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -260,7 +279,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE,
(s, w) -> ser.writeTuple(tx, key, s, w, true),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -272,7 +292,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE_EXACT,
(s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -286,11 +307,16 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
public @NotNull CompletableFuture<Collection<Tuple>>
removeAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> keys) {
Objects.requireNonNull(keys);
+ if (keys.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_DELETE_ALL,
(s, w) -> ser.writeTuples(tx, keys, s, w, true),
(s, r) -> ClientTupleSerializer.readTuples(s, r, true),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx,
keys.iterator().next()));
}
/** {@inheritDoc} */
@@ -307,7 +333,9 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_DELETE,
(s, w) -> ser.writeTuple(tx, key, s, w, true),
- ClientTupleSerializer::readValueTuple);
+ ClientTupleSerializer::readValueTuple,
+ null,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/**
@@ -350,7 +378,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_REPLACE,
(s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -364,7 +393,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
ser.writeKvTuple(tx, key, oldVal, s, w, false);
ser.writeKvTuple(tx, key, newVal, s, w, true);
},
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/** {@inheritDoc} */
@@ -382,7 +412,9 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_REPLACE,
(s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
- ClientTupleSerializer::readValueTuple);
+ ClientTupleSerializer::readValueTuple,
+ null,
+ ClientTupleSerializer.getHashFunction(tx, key));
}
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 224ad509c3..ba7fb570d2 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -95,7 +95,9 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET,
(s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
- (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+ (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -141,7 +143,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
ClientOp.TUPLE_GET_ALL,
(s, w) -> keySer.writeRecs(tx, keys, s, w, TuplePart.KEY),
this::readGetAllResponse,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
keys.iterator().next()));
}
/** {@inheritDoc} */
@@ -158,7 +161,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_CONTAINS_KEY,
(s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -175,7 +179,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT,
(s, w) -> writeKeyValue(s, w, tx, key, val),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -203,7 +208,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
writeKeyValueRaw(s, w, e.getKey(), e.getValue());
}
},
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
pairs.keySet().iterator().next()));
}
/** {@inheritDoc} */
@@ -221,7 +227,9 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_UPSERT,
(s, w) -> writeKeyValue(s, w, tx, key, val),
- (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+ (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -250,7 +258,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_INSERT,
(s, w) -> writeKeyValue(s, w, tx, key, val),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -273,7 +282,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE,
(s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -284,7 +294,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE_EXACT,
(s, w) -> writeKeyValue(s, w, tx, key, val),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -306,7 +317,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
ClientOp.TUPLE_DELETE_ALL,
(s, w) -> keySer.writeRecs(tx, keys, s, w, TuplePart.KEY),
(s, r) -> keySer.readRecs(s, r, false, TuplePart.KEY),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
keys.iterator().next()));
}
/** {@inheritDoc} */
@@ -323,7 +335,9 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_DELETE,
(s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
- (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+ (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -360,7 +374,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_REPLACE,
(s, w) -> writeKeyValue(s, w, tx, key, val),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -375,7 +390,8 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
writeKeyValueRaw(s, w, key, oldVal);
writeKeyValueRaw(s, w, key, newVal);
},
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
@@ -393,7 +409,9 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_REPLACE,
(s, w) -> writeKeyValue(s, w, tx, key, val),
- (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+ (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, keySer.mapper(),
key));
}
/** {@inheritDoc} */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index 9751119b7f..7847b23cd3 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -25,10 +25,8 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
-import org.apache.ignite.internal.util.HashCalculator;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
@@ -74,7 +72,7 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
(s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
(s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec),
null,
- getHashFunction(tx, keyRec));
+ ClientTupleSerializer.getHashFunction(tx, keyRec));
}
/** {@inheritDoc} */
@@ -88,11 +86,16 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@Nullable
Transaction tx, @NotNull Collection<Tuple> keyRecs) {
Objects.requireNonNull(keyRecs);
+ if (keyRecs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_ALL,
(s, w) -> ser.writeTuples(tx, keyRecs, s, w, true),
ClientTupleSerializer::readTuplesNullable,
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx,
keyRecs.iterator().next()));
}
/** {@inheritDoc} */
@@ -109,7 +112,8 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT,
(s, w) -> ser.writeTuple(tx, rec, s, w),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx, rec));
}
/** {@inheritDoc} */
@@ -123,10 +127,15 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
public @NotNull CompletableFuture<Void> upsertAllAsync(@Nullable
Transaction tx, @NotNull Collection<Tuple> recs) {
Objects.requireNonNull(recs);
+ if (recs.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT_ALL,
(s, w) -> ser.writeTuples(tx, recs, s, w, false),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx,
recs.iterator().next()));
}
/** {@inheritDoc} */
@@ -143,7 +152,9 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_UPSERT,
(s, w) -> ser.writeTuple(tx, rec, s, w, false),
- (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec));
+ (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, rec));
}
/** {@inheritDoc} */
@@ -160,7 +171,8 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_INSERT,
(s, w) -> ser.writeTuple(tx, rec, s, w, false),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, rec));
}
/** {@inheritDoc} */
@@ -174,11 +186,16 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
public @NotNull CompletableFuture<Collection<Tuple>>
insertAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> recs) {
Objects.requireNonNull(recs);
+ if (recs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_INSERT_ALL,
(s, w) -> ser.writeTuples(tx, recs, s, w, false),
ClientTupleSerializer::readTuples,
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx,
recs.iterator().next()));
}
/** {@inheritDoc} */
@@ -201,7 +218,8 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_REPLACE,
(s, w) -> ser.writeTuple(tx, rec, s, w, false),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, rec));
}
/** {@inheritDoc} */
@@ -216,7 +234,8 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
ser.writeTuple(tx, oldRec, s, w, false, false);
ser.writeTuple(tx, newRec, s, w, false, true);
},
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, oldRec));
}
/** {@inheritDoc} */
@@ -233,7 +252,9 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_REPLACE,
(s, w) -> ser.writeTuple(tx, rec, s, w, false),
- (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec));
+ (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, rec));
}
/** {@inheritDoc} */
@@ -250,7 +271,8 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE,
(s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, keyRec));
}
/** {@inheritDoc} */
@@ -267,7 +289,8 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE_EXACT,
(s, w) -> ser.writeTuple(tx, rec, s, w, false),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, rec));
}
/** {@inheritDoc} */
@@ -284,7 +307,9 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_DELETE,
(s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
- (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec));
+ (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, keyRec));
}
/** {@inheritDoc} */
@@ -298,11 +323,16 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
public @NotNull CompletableFuture<Collection<Tuple>>
deleteAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> keyRecs) {
Objects.requireNonNull(keyRecs);
+ if (keyRecs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_DELETE_ALL,
(s, w) -> ser.writeTuples(tx, keyRecs, s, w, true),
(s, r) -> ClientTupleSerializer.readTuples(s, r, true),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx,
keyRecs.iterator().next()));
}
/** {@inheritDoc} */
@@ -316,11 +346,16 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
public @NotNull CompletableFuture<Collection<Tuple>>
deleteAllExactAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> recs) {
Objects.requireNonNull(recs);
+ if (recs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_DELETE_ALL_EXACT,
(s, w) -> ser.writeTuples(tx, recs, s, w, false),
ClientTupleSerializer::readTuples,
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx,
recs.iterator().next()));
}
/** {@inheritDoc} */
@@ -358,21 +393,4 @@ public class ClientRecordBinaryView implements
RecordView<Tuple> {
) {
throw new UnsupportedOperationException("Not implemented yet.");
}
-
- private Integer getColocationHash(ClientSchema schema, Tuple rec) {
- var hashCalc = new HashCalculator();
-
- for (ClientColumn col : schema.colocationColumns()) {
- Object value = rec.valueOrDefault(col.name(), null);
- hashCalc.append(value);
- }
-
- return hashCalc.hash();
- }
-
- @Nullable
- private Function<ClientSchema, Integer> getHashFunction(@Nullable
Transaction tx, @NotNull Tuple rec) {
- // Disable partition awareness when transaction is used: tx belongs to
a default connection.
- return tx != null ? null : schema -> getColocationHash(schema, rec);
- }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
index 35a12fc181..3619a933f3 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
@@ -70,7 +70,9 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET,
(s, w) -> ser.writeRec(tx, keyRec, s, w, TuplePart.KEY),
- (s, r) -> ser.readValRec(keyRec, s, r));
+ (s, r) -> ser.readValRec(keyRec, s, r),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
keyRec));
}
/** {@inheritDoc} */
@@ -84,11 +86,16 @@ public class ClientRecordView<R> implements RecordView<R> {
public @NotNull CompletableFuture<Collection<R>> getAllAsync(@Nullable
Transaction tx, @NotNull Collection<R> keyRecs) {
Objects.requireNonNull(keyRecs);
+ if (keyRecs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_ALL,
(s, w) -> ser.writeRecs(tx, keyRecs, s, w, TuplePart.KEY),
(s, r) -> ser.readRecs(s, r, true, TuplePart.KEY_AND_VAL),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
keyRecs.iterator().next()));
}
/** {@inheritDoc} */
@@ -105,7 +112,8 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT,
(s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
}
/** {@inheritDoc} */
@@ -119,10 +127,15 @@ public class ClientRecordView<R> implements RecordView<R>
{
public @NotNull CompletableFuture<Void> upsertAllAsync(@Nullable
Transaction tx, @NotNull Collection<R> recs) {
Objects.requireNonNull(recs);
+ if (recs.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT_ALL,
(s, w) -> ser.writeRecs(tx, recs, s, w, TuplePart.KEY_AND_VAL),
- r -> null);
+ r -> null,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
recs.iterator().next()));
}
/** {@inheritDoc} */
@@ -139,7 +152,9 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_UPSERT,
(s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
- (s, r) -> ser.readValRec(rec, s, r));
+ (s, r) -> ser.readValRec(rec, s, r),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
}
/** {@inheritDoc} */
@@ -156,7 +171,8 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_INSERT,
(s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
}
/** {@inheritDoc} */
@@ -170,11 +186,16 @@ public class ClientRecordView<R> implements RecordView<R>
{
public @NotNull CompletableFuture<Collection<R>> insertAllAsync(@Nullable
Transaction tx, @NotNull Collection<R> recs) {
Objects.requireNonNull(recs);
+ if (recs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_INSERT_ALL,
(s, w) -> ser.writeRecs(tx, recs, s, w, TuplePart.KEY_AND_VAL),
(s, r) -> ser.readRecs(s, r, false, TuplePart.KEY_AND_VAL),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
recs.iterator().next()));
}
/** {@inheritDoc} */
@@ -197,7 +218,8 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_REPLACE,
(s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
}
/** {@inheritDoc} */
@@ -209,7 +231,8 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_REPLACE_EXACT,
(s, w) -> ser.writeRecs(tx, oldRec, newRec, s, w,
TuplePart.KEY_AND_VAL),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
oldRec));
}
/** {@inheritDoc} */
@@ -226,7 +249,9 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_REPLACE,
(s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
- (s, r) -> ser.readValRec(rec, s, r));
+ (s, r) -> ser.readValRec(rec, s, r),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
}
/** {@inheritDoc} */
@@ -243,7 +268,8 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE,
(s, w) -> ser.writeRec(tx, keyRec, s, w, TuplePart.KEY),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
keyRec));
}
/** {@inheritDoc} */
@@ -260,7 +286,8 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_DELETE_EXACT,
(s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
- ClientMessageUnpacker::unpackBoolean);
+ ClientMessageUnpacker::unpackBoolean,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
}
/** {@inheritDoc} */
@@ -277,7 +304,9 @@ public class ClientRecordView<R> implements RecordView<R> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET_AND_DELETE,
(s, w) -> ser.writeRec(tx, keyRec, s, w, TuplePart.KEY),
- (s, r) -> ser.readValRec(keyRec, s, r));
+ (s, r) -> ser.readValRec(keyRec, s, r),
+ null,
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
keyRec));
}
/** {@inheritDoc} */
@@ -291,11 +320,16 @@ public class ClientRecordView<R> implements RecordView<R>
{
public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(@Nullable
Transaction tx, @NotNull Collection<R> keyRecs) {
Objects.requireNonNull(keyRecs);
+ if (keyRecs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_DELETE_ALL,
(s, w) -> ser.writeRecs(tx, keyRecs, s, w, TuplePart.KEY),
(s, r) -> ser.readRecs(s, r, false, TuplePart.KEY),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
keyRecs.iterator().next()));
}
/** {@inheritDoc} */
@@ -309,11 +343,16 @@ public class ClientRecordView<R> implements RecordView<R>
{
public @NotNull CompletableFuture<Collection<R>>
deleteAllExactAsync(@Nullable Transaction tx, @NotNull Collection<R> recs) {
Objects.requireNonNull(recs);
+ if (recs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_DELETE_ALL_EXACT,
(s, w) -> ser.writeRecs(tx, recs, s, w, TuplePart.KEY_AND_VAL),
(s, r) -> ser.readRecs(s, r, false, TuplePart.KEY_AND_VAL),
- Collections.emptyList());
+ Collections.emptyList(),
+ ClientTupleSerializer.getHashFunction(tx, ser.mapper(),
recs.iterator().next()));
}
/** {@inheritDoc} */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 68ac843a7a..d7332807be 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -283,18 +283,8 @@ public class ClientTable implements Table {
return CompletableFuture.allOf(schemaFut, partitionsFut)
.thenCompose(v -> {
- List<String> partitions = partitionsFut.getNow(null);
ClientSchema schema = schemaFut.getNow(null);
-
- String preferredNodeId = null;
-
- if (partitions != null && partitions.size() > 0 &&
hashFunction != null) {
- Integer hash = hashFunction.apply(schema);
-
- if (hash != null) {
- preferredNodeId = partitions.get(Math.abs(hash %
partitions.size()));
- }
- }
+ String preferredNodeId = getPreferredNodeId(hashFunction,
partitionsFut.getNow(null), schema);
return ch.serviceAsync(opCode,
w -> writer.accept(schema, w),
@@ -325,6 +315,30 @@ public class ClientTable implements Table {
r -> reader.apply(r.in())));
}
+ <T> CompletableFuture<T> doSchemaOutOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+ Function<ClientMessageUnpacker, T> reader,
+ Function<ClientSchema, Integer> hashFunction) {
+
+ CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+ CompletableFuture<List<String>> partitionsFut = hashFunction == null
+ ? CompletableFuture.completedFuture(null)
+ : getPartitionAssignment();
+
+ return CompletableFuture.allOf(schemaFut, partitionsFut)
+ .thenCompose(v -> {
+ ClientSchema schema = schemaFut.getNow(null);
+ String preferredNodeId = getPreferredNodeId(hashFunction,
partitionsFut.getNow(null), schema);
+
+ return ch.serviceAsync(opCode,
+ w -> writer.accept(schema, w),
+ r -> reader.apply(r.in()),
+ null,
+ preferredNodeId);
+ });
+ }
+
private <T> Object readSchemaAndReadData(
ClientSchema knownSchema,
ClientMessageUnpacker in,
@@ -403,4 +417,19 @@ public class ClientTable implements Table {
return res;
});
}
+
+ @Nullable
+ private static String getPreferredNodeId(Function<ClientSchema, Integer>
hashFunction, List<String> partitions, ClientSchema schema) {
+ if (partitions == null || partitions.isEmpty() || hashFunction ==
null) {
+ return null;
+ }
+
+ Integer hash = hashFunction.apply(schema);
+
+ if (hash == null) {
+ return null;
+ }
+
+ return partitions.get(Math.abs(hash % partitions.size()));
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 0bf002c89f..43add6b860 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -33,15 +33,19 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
import org.apache.ignite.internal.client.proto.ClientDataType;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.util.HashCalculator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -439,4 +443,40 @@ public class ClientTupleSerializer {
throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for
column '" + col.name() + "': " + e.getMessage(), e);
}
}
+
+ @Nullable
+ static Function<ClientSchema, Integer> getHashFunction(@Nullable
Transaction tx, @NotNull Tuple rec) {
+ // Disable partition awareness when transaction is used: tx belongs to
a default connection.
+ return tx != null ? null : schema -> getColocationHash(schema, rec);
+ }
+
+ @Nullable
+ static Function<ClientSchema, Integer> getHashFunction(@Nullable
Transaction tx, Mapper<?> mapper, @NotNull Object rec) {
+ // Disable partition awareness when transaction is used: tx belongs to
a default connection.
+ return tx != null ? null : schema -> getColocationHash(schema, mapper,
rec);
+ }
+
+ private static Integer getColocationHash(ClientSchema schema, Tuple rec) {
+ var hashCalc = new HashCalculator();
+
+ for (ClientColumn col : schema.colocationColumns()) {
+ Object value = rec.valueOrDefault(col.name(), null);
+ hashCalc.append(value);
+ }
+
+ return hashCalc.hash();
+ }
+
+ private static Integer getColocationHash(ClientSchema schema, Mapper<?>
mapper, Object rec) {
+ // Colocation columns are always part of the key -
https://cwiki.apache.org/confluence/display/IGNITE/IEP-86%3A+Colocation+Key.
+ var hashCalc = new HashCalculator();
+ var marsh = schema.getMarshaller(mapper, TuplePart.KEY);
+
+ for (ClientColumn col : schema.colocationColumns()) {
+ Object value = marsh.value(rec, col.schemaIndex());
+ hashCalc.append(value);
+ }
+
+ return hashCalc.hash();
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 371a63ccc6..16f9c9f9b2 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -21,6 +21,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty.util.ResourceLeakDetector;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
@@ -28,9 +30,11 @@ import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.client.fakes.FakeInternalTable;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -93,7 +97,7 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
}
@Test
- public void testGetRoutesRequestToPrimaryNode() {
+ public void testGetTupleRoutesRequestToPrimaryNode() {
RecordView<Tuple> recordView = defaultTable().recordView();
assertOpOnNode("server-1", "get", x -> recordView.get(null,
Tuple.create().set("ID", 0L)));
@@ -102,6 +106,37 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
assertOpOnNode("server-2", "get", x -> recordView.get(null,
Tuple.create().set("ID", 3L)));
}
+ @Test
+ public void testGetRecordRoutesRequestToPrimaryNode() {
+ RecordView<AbstractClientTableTest.PersonPojo> pojoView =
defaultTable().recordView(
+ Mapper.of(AbstractClientTableTest.PersonPojo.class));
+
+ assertOpOnNode("server-1", "get", x -> pojoView.get(null, new
AbstractClientTableTest.PersonPojo(0L)));
+ assertOpOnNode("server-2", "get", x -> pojoView.get(null, new
AbstractClientTableTest.PersonPojo(1L)));
+ assertOpOnNode("server-1", "get", x -> pojoView.get(null, new
AbstractClientTableTest.PersonPojo(2L)));
+ assertOpOnNode("server-2", "get", x -> pojoView.get(null, new
AbstractClientTableTest.PersonPojo(3L)));
+ }
+
+ @Test
+ public void testGetKeyValueRoutesRequestToPrimaryNode() {
+ KeyValueView<Long, String> kvView =
defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
+
+ assertOpOnNode("server-1", "get", x -> kvView.get(null, 0L));
+ assertOpOnNode("server-2", "get", x -> kvView.get(null, 1L));
+ assertOpOnNode("server-1", "get", x -> kvView.get(null, 2L));
+ assertOpOnNode("server-2", "get", x -> kvView.get(null, 3L));
+ }
+
+ @Test
+ public void testGetKeyValueBinaryRoutesRequestToPrimaryNode() {
+ KeyValueView<Tuple, Tuple> kvView = defaultTable().keyValueView();
+
+ assertOpOnNode("server-1", "get", x -> kvView.get(null,
Tuple.create().set("ID", 0L)));
+ assertOpOnNode("server-2", "get", x -> kvView.get(null,
Tuple.create().set("ID", 1L)));
+ assertOpOnNode("server-1", "get", x -> kvView.get(null,
Tuple.create().set("ID", 2L)));
+ assertOpOnNode("server-2", "get", x -> kvView.get(null,
Tuple.create().set("ID", 3L)));
+ }
+
@Test
public void testNonNullTxDisablesPartitionAwareness() {
RecordView<Tuple> recordView = defaultTable().recordView();
@@ -170,22 +205,210 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
@Test
public void testAllRecordViewOperations() {
- // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ RecordView<AbstractClientTableTest.PersonPojo> pojoView =
defaultTable().recordView(
+ Mapper.of(AbstractClientTableTest.PersonPojo.class));
+
+ var t1 = new AbstractClientTableTest.PersonPojo(0L);
+ var t2 = new AbstractClientTableTest.PersonPojo(1L);
+
+ assertOpOnNode("server-1", "insert", x -> pojoView.insert(null, t1));
+ assertOpOnNode("server-2", "insert", x -> pojoView.insert(null, t2));
+
+ assertOpOnNode("server-1", "insertAll", x -> pojoView.insertAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "insertAll", x -> pojoView.insertAll(null,
List.of(t2)));
+
+ assertOpOnNode("server-1", "upsert", x -> pojoView.upsert(null, t1));
+ assertOpOnNode("server-2", "upsert", x -> pojoView.upsert(null, t2));
+
+ assertOpOnNode("server-1", "upsertAll", x -> pojoView.upsertAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "upsertAll", x -> pojoView.upsertAll(null,
List.of(t2)));
+
+ assertOpOnNode("server-1", "get", x -> pojoView.get(null, t1));
+ assertOpOnNode("server-2", "get", x -> pojoView.get(null, t2));
+
+ assertOpOnNode("server-1", "getAll", x -> pojoView.getAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "getAll", x -> pojoView.getAll(null,
List.of(t2)));
+
+ assertOpOnNode("server-1", "getAndUpsert", x ->
pojoView.getAndUpsert(null, t1));
+ assertOpOnNode("server-2", "getAndUpsert", x ->
pojoView.getAndUpsert(null, t2));
+
+ assertOpOnNode("server-1", "getAndReplace", x ->
pojoView.getAndReplace(null, t1));
+ assertOpOnNode("server-2", "getAndReplace", x ->
pojoView.getAndReplace(null, t2));
+
+ assertOpOnNode("server-1", "getAndDelete", x ->
pojoView.getAndDelete(null, t1));
+ assertOpOnNode("server-2", "getAndDelete", x ->
pojoView.getAndDelete(null, t2));
+
+ assertOpOnNode("server-1", "replace", x -> pojoView.replace(null, t1));
+ assertOpOnNode("server-2", "replace", x -> pojoView.replace(null, t2));
+
+ assertOpOnNode("server-1", "replace", x -> pojoView.replace(null, t1,
t1));
+ assertOpOnNode("server-2", "replace", x -> pojoView.replace(null, t2,
t2));
+
+ assertOpOnNode("server-1", "delete", x -> pojoView.delete(null, t1));
+ assertOpOnNode("server-2", "delete", x -> pojoView.delete(null, t2));
+
+ assertOpOnNode("server-1", "deleteExact", x ->
pojoView.deleteExact(null, t1));
+ assertOpOnNode("server-2", "deleteExact", x ->
pojoView.deleteExact(null, t2));
+
+ assertOpOnNode("server-1", "deleteAll", x -> pojoView.deleteAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "deleteAll", x -> pojoView.deleteAll(null,
List.of(t2)));
+
+ assertOpOnNode("server-1", "deleteAllExact", x ->
pojoView.deleteAllExact(null, List.of(t1)));
+ assertOpOnNode("server-2", "deleteAllExact", x ->
pojoView.deleteAllExact(null, List.of(t2)));
}
@Test
public void testAllRecordBinaryViewOperations() {
- // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ RecordView<Tuple> recordView = defaultTable().recordView();
+
+ Tuple t1 = Tuple.create().set("ID", 0L);
+ Tuple t2 = Tuple.create().set("ID", 1L);
+
+ assertOpOnNode("server-1", "insert", x -> recordView.insert(null, t1));
+ assertOpOnNode("server-2", "insert", x -> recordView.insert(null, t2));
+
+ assertOpOnNode("server-1", "insertAll", x ->
recordView.insertAll(null, List.of(t1)));
+ assertOpOnNode("server-2", "insertAll", x ->
recordView.insertAll(null, List.of(t2)));
+
+ assertOpOnNode("server-1", "upsert", x -> recordView.upsert(null, t1));
+ assertOpOnNode("server-2", "upsert", x -> recordView.upsert(null, t2));
+
+ assertOpOnNode("server-1", "upsertAll", x ->
recordView.upsertAll(null, List.of(t1)));
+ assertOpOnNode("server-2", "upsertAll", x ->
recordView.upsertAll(null, List.of(t2)));
+
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, t1));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, t2));
+
+ assertOpOnNode("server-1", "getAll", x -> recordView.getAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "getAll", x -> recordView.getAll(null,
List.of(t2)));
+
+ assertOpOnNode("server-1", "getAndUpsert", x ->
recordView.getAndUpsert(null, t1));
+ assertOpOnNode("server-2", "getAndUpsert", x ->
recordView.getAndUpsert(null, t2));
+
+ assertOpOnNode("server-1", "getAndReplace", x ->
recordView.getAndReplace(null, t1));
+ assertOpOnNode("server-2", "getAndReplace", x ->
recordView.getAndReplace(null, t2));
+
+ assertOpOnNode("server-1", "getAndDelete", x ->
recordView.getAndDelete(null, t1));
+ assertOpOnNode("server-2", "getAndDelete", x ->
recordView.getAndDelete(null, t2));
+
+ assertOpOnNode("server-1", "replace", x -> recordView.replace(null,
t1));
+ assertOpOnNode("server-2", "replace", x -> recordView.replace(null,
t2));
+
+ assertOpOnNode("server-1", "replace", x -> recordView.replace(null,
t1, t1));
+ assertOpOnNode("server-2", "replace", x -> recordView.replace(null,
t2, t2));
+
+ assertOpOnNode("server-1", "delete", x -> recordView.delete(null, t1));
+ assertOpOnNode("server-2", "delete", x -> recordView.delete(null, t2));
+
+ assertOpOnNode("server-1", "deleteExact", x ->
recordView.deleteExact(null, t1));
+ assertOpOnNode("server-2", "deleteExact", x ->
recordView.deleteExact(null, t2));
+
+ assertOpOnNode("server-1", "deleteAll", x ->
recordView.deleteAll(null, List.of(t1)));
+ assertOpOnNode("server-2", "deleteAll", x ->
recordView.deleteAll(null, List.of(t2)));
+
+ assertOpOnNode("server-1", "deleteAllExact", x ->
recordView.deleteAllExact(null, List.of(t1)));
+ assertOpOnNode("server-2", "deleteAllExact", x ->
recordView.deleteAllExact(null, List.of(t2)));
}
@Test
public void testAllKeyValueViewOperations() {
- // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ KeyValueView<Long, String> kvView =
defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
+
+ var k1 = 0L;
+ var k2 = 1L;
+ var v = "v";
+
+ assertOpOnNode("server-1", "insert", x -> kvView.putIfAbsent(null, k1,
v));
+ assertOpOnNode("server-2", "insert", x -> kvView.putIfAbsent(null, k2,
v));
+
+ assertOpOnNode("server-1", "upsert", x -> kvView.put(null, k1, v));
+ assertOpOnNode("server-2", "upsert", x -> kvView.put(null, k2, v));
+
+ assertOpOnNode("server-1", "upsertAll", x -> kvView.putAll(null,
Map.of(k1, v)));
+ assertOpOnNode("server-2", "upsertAll", x -> kvView.putAll(null,
Map.of(k2, v)));
+
+ assertOpOnNode("server-1", "get", x -> kvView.get(null, k1));
+ assertOpOnNode("server-2", "get", x -> kvView.get(null, k2));
+
+ assertOpOnNode("server-1", "get", x -> kvView.contains(null, k1));
+ assertOpOnNode("server-2", "get", x -> kvView.contains(null, k2));
+
+ assertOpOnNode("server-1", "getAll", x -> kvView.getAll(null,
List.of(k1)));
+ assertOpOnNode("server-2", "getAll", x -> kvView.getAll(null,
List.of(k2)));
+
+ assertOpOnNode("server-1", "getAndUpsert", x -> kvView.getAndPut(null,
k1, v));
+ assertOpOnNode("server-2", "getAndUpsert", x -> kvView.getAndPut(null,
k2, v));
+
+ assertOpOnNode("server-1", "getAndReplace", x ->
kvView.getAndReplace(null, k1, v));
+ assertOpOnNode("server-2", "getAndReplace", x ->
kvView.getAndReplace(null, k2, v));
+
+ assertOpOnNode("server-1", "getAndDelete", x ->
kvView.getAndRemove(null, k1));
+ assertOpOnNode("server-2", "getAndDelete", x ->
kvView.getAndRemove(null, k2));
+
+ assertOpOnNode("server-1", "replace", x -> kvView.replace(null, k1,
v));
+ assertOpOnNode("server-2", "replace", x -> kvView.replace(null, k2,
v));
+
+ assertOpOnNode("server-1", "replace", x -> kvView.replace(null, k1, v,
v));
+ assertOpOnNode("server-2", "replace", x -> kvView.replace(null, k2, v,
v));
+
+ assertOpOnNode("server-1", "delete", x -> kvView.remove(null, k1));
+ assertOpOnNode("server-2", "delete", x -> kvView.remove(null, k2));
+
+ assertOpOnNode("server-1", "deleteExact", x -> kvView.remove(null, k1,
v));
+ assertOpOnNode("server-2", "deleteExact", x -> kvView.remove(null, k2,
v));
+
+ assertOpOnNode("server-1", "deleteAll", x -> kvView.removeAll(null,
List.of(k1)));
+ assertOpOnNode("server-2", "deleteAll", x -> kvView.removeAll(null,
List.of(k2)));
}
@Test
public void testAllKeyValueBinaryViewOperations() {
- // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ KeyValueView<Tuple, Tuple> kvView = defaultTable().keyValueView();
+
+ Tuple t1 = Tuple.create().set("ID", 0L);
+ Tuple t2 = Tuple.create().set("ID", 1L);
+
+ assertOpOnNode("server-1", "insert", x -> kvView.putIfAbsent(null, t1,
t1));
+ assertOpOnNode("server-2", "insert", x -> kvView.putIfAbsent(null, t2,
t2));
+
+ assertOpOnNode("server-1", "upsert", x -> kvView.put(null, t1, t1));
+ assertOpOnNode("server-2", "upsert", x -> kvView.put(null, t2, t2));
+
+ assertOpOnNode("server-1", "upsertAll", x -> kvView.putAll(null,
Map.of(t1, t1)));
+ assertOpOnNode("server-2", "upsertAll", x -> kvView.putAll(null,
Map.of(t2, t2)));
+
+ assertOpOnNode("server-1", "get", x -> kvView.get(null, t1));
+ assertOpOnNode("server-2", "get", x -> kvView.get(null, t2));
+
+ assertOpOnNode("server-1", "get", x -> kvView.contains(null, t1));
+ assertOpOnNode("server-2", "get", x -> kvView.contains(null, t2));
+
+ assertOpOnNode("server-1", "getAll", x -> kvView.getAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "getAll", x -> kvView.getAll(null,
List.of(t2)));
+
+ assertOpOnNode("server-1", "getAndUpsert", x -> kvView.getAndPut(null,
t1, t1));
+ assertOpOnNode("server-2", "getAndUpsert", x -> kvView.getAndPut(null,
t2, t2));
+
+ assertOpOnNode("server-1", "getAndReplace", x ->
kvView.getAndReplace(null, t1, t1));
+ assertOpOnNode("server-2", "getAndReplace", x ->
kvView.getAndReplace(null, t2, t2));
+
+ assertOpOnNode("server-1", "getAndDelete", x ->
kvView.getAndRemove(null, t1));
+ assertOpOnNode("server-2", "getAndDelete", x ->
kvView.getAndRemove(null, t2));
+
+ assertOpOnNode("server-1", "replace", x -> kvView.replace(null, t1,
t1));
+ assertOpOnNode("server-2", "replace", x -> kvView.replace(null, t2,
t2));
+
+ assertOpOnNode("server-1", "replace", x -> kvView.replace(null, t1,
t1, t1));
+ assertOpOnNode("server-2", "replace", x -> kvView.replace(null, t2,
t2, t2));
+
+ assertOpOnNode("server-1", "delete", x -> kvView.remove(null, t1));
+ assertOpOnNode("server-2", "delete", x -> kvView.remove(null, t2));
+
+ assertOpOnNode("server-1", "deleteExact", x -> kvView.remove(null, t1,
t1));
+ assertOpOnNode("server-2", "deleteExact", x -> kvView.remove(null, t2,
t2));
+
+ assertOpOnNode("server-1", "deleteAll", x -> kvView.removeAll(null,
List.of(t1)));
+ assertOpOnNode("server-2", "deleteAll", x -> kvView.removeAll(null,
List.of(t2)));
}
private void assertOpOnNode(String expectedNode, String expectedOp,
Consumer<Void> op) {
@@ -194,8 +417,8 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
op.accept(null);
- assertEquals(expectedNode, lastOpServerName);
assertEquals(expectedOp, lastOp);
+ assertEquals(expectedNode, lastOpServerName, "Operation " + expectedOp
+ " was not executed on expected node");
}
private Table defaultTable() {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 8750bbbfeb..c2c33a67da 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -179,7 +179,7 @@ public class RetryPolicyTest {
@Test
public void testRetryReadPolicyDoesNotRetryWriteOperations() throws
Exception {
- initServer(reqId -> reqId % 5 == 0);
+ initServer(reqId -> reqId % 6 == 0);
try (var client = getClient(new RetryReadPolicy())) {
RecordView<Tuple> recView =
client.tables().table("t").recordView();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 3c5f46226a..68a006326b 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -100,8 +100,6 @@ public class FakeInternalTable implements InternalTable {
@Override
public CompletableFuture<Collection<BinaryRow>>
getAll(Collection<BinaryRowEx> keyRows,
@Nullable InternalTransaction tx) {
- onDataAccess("getAll", keyRows);
-
var res = new ArrayList<BinaryRow>();
for (var key : keyRows) {
@@ -112,6 +110,7 @@ public class FakeInternalTable implements InternalTable {
}
}
+ onDataAccess("getAll", keyRows);
return CompletableFuture.completedFuture(res);
}
@@ -128,12 +127,11 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows,
@Nullable InternalTransaction tx) {
- onDataAccess("upsertAll", rows);
-
for (var row : rows) {
upsert(row, tx);
}
+ onDataAccess("upsertAll", rows);
return CompletableFuture.completedFuture(null);
}
@@ -141,36 +139,33 @@ public class FakeInternalTable implements InternalTable {
@Override
public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
@Nullable InternalTransaction tx) {
- onDataAccess("getAndUpsert", row);
-
var res = get(row, tx);
upsert(row, tx);
+ onDataAccess("getAndUpsert", row);
return CompletableFuture.completedFuture(res.getNow(null));
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable
InternalTransaction tx) {
- onDataAccess("insert", row);
-
var old = get(row, tx).getNow(null);
+ boolean res = false;
if (old == null) {
upsert(row, tx);
- return CompletableFuture.completedFuture(true);
+ res = true;
}
- return CompletableFuture.completedFuture(false);
+ onDataAccess("insert", row);
+ return CompletableFuture.completedFuture(res);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Collection<BinaryRow>>
insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
- onDataAccess("insertAll", rows);
-
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
@@ -179,97 +174,100 @@ public class FakeInternalTable implements InternalTable {
}
}
+ onDataAccess("insertAll", rows);
return CompletableFuture.completedFuture(skipped);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable
InternalTransaction tx) {
- onDataAccess("replace", row);
-
var old = get(row, tx).getNow(null);
if (old == null) {
+ onDataAccess("replace", row);
return CompletableFuture.completedFuture(false);
}
- return upsert(row, tx).thenApply(f -> true);
+ CompletableFuture<Void> upsert = upsert(row, tx);
+
+ onDataAccess("replace", row);
+ return upsert.thenApply(f -> true);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx
newRow, @Nullable InternalTransaction tx) {
- onDataAccess("replace", oldRow);
-
var old = get(oldRow, tx).getNow(null);
if (old == null || !old.valueSlice().equals(oldRow.valueSlice())) {
+ onDataAccess("replace", oldRow);
return CompletableFuture.completedFuture(false);
}
- return upsert(newRow, tx).thenApply(f -> true);
+ CompletableFuture<Void> upsert = upsert(newRow, tx);
+
+ onDataAccess("replace", oldRow);
+ return upsert.thenApply(f -> true);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row,
@Nullable InternalTransaction tx) {
- onDataAccess("getAndReplace", row);
-
var old = get(row, tx);
- return replace(row, tx).thenCompose(f -> old);
+ CompletableFuture<Boolean> replace = replace(row, tx);
+
+ onDataAccess("getAndReplace", row);
+ return replace.thenCompose(f -> old);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable
InternalTransaction tx) {
- onDataAccess("delete", keyRow);
-
var old = get(keyRow, tx).getNow(null);
if (old != null) {
data.remove(keyRow.keySlice());
}
+ onDataAccess("delete", keyRow);
return CompletableFuture.completedFuture(old != null);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow,
@Nullable InternalTransaction tx) {
- onDataAccess("deleteExact", oldRow);
+ var res = false;
var old = get(oldRow, tx).getNow(null);
if (old != null && old.valueSlice().equals(oldRow.valueSlice())) {
data.remove(oldRow.keySlice());
- return CompletableFuture.completedFuture(true);
+ res = true;
}
- return CompletableFuture.completedFuture(false);
+ onDataAccess("deleteExact", oldRow);
+ return CompletableFuture.completedFuture(res);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row,
@Nullable InternalTransaction tx) {
- onDataAccess("getAndDelete", row);
-
var old = get(row, tx).getNow(null);
if (old != null) {
data.remove(row.keySlice());
}
+ onDataAccess("getAndDelete", row);
return CompletableFuture.completedFuture(old);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Collection<BinaryRow>>
deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
- onDataAccess("deleteAll", rows);
-
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
@@ -278,14 +276,13 @@ public class FakeInternalTable implements InternalTable {
}
}
+ onDataAccess("deleteAll", rows);
return CompletableFuture.completedFuture(skipped);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Collection<BinaryRow>>
deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
- onDataAccess("deleteAllExact", rows);
-
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
@@ -294,6 +291,7 @@ public class FakeInternalTable implements InternalTable {
}
}
+ onDataAccess("deleteAllExact", rows);
return CompletableFuture.completedFuture(skipped);
}