This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6fc263e3ec IGNITE-23092 Compute: allow nested tuples in arguments and
results (#4456)
6fc263e3ec is described below
commit 6fc263e3ec39882baf5c9881725e790617730b59
Author: Maksim Myskov <[email protected]>
AuthorDate: Wed Oct 2 12:38:29 2024 +0300
IGNITE-23092 Compute: allow nested tuples in arguments and results (#4456)
In addition to existing Tuple support as Compute API arguments and results,
allow nested Tuples (Tuple in Tuple).
---
.../inlineschema/TupleWithSchemaMarshalling.java | 49 +++++++++++++++-------
.../TupleWithSchemaMarshallingTest.java | 9 ++--
.../ignite/internal/compute/ItComputeBaseTest.java | 44 +++++++++++++++++++
3 files changed, 84 insertions(+), 18 deletions(-)
diff --git
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
index ace65ac1e1..ce28666a2c 100644
---
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
+++
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
@@ -43,6 +43,8 @@ import org.jetbrains.annotations.Nullable;
public final class TupleWithSchemaMarshalling {
private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+ private static final int NESTED_TUPLE_FLAG = -1;
+
/**
* Marshal tuple in the following format (LITTLE_ENDIAN).
*
@@ -68,18 +70,18 @@ public final class TupleWithSchemaMarshalling {
int size = tuple.columnCount();
Object[] values = new Object[size];
String[] columns = new String[size];
- ColumnType[] types = new ColumnType[size];
+ int[] colTypeIds = new int[size];
// Fill in the values, column names, and types.
for (int i = 0; i < size; i++) {
var value = tuple.value(i);
values[i] = value;
columns[i] = tuple.columnName(i);
- types[i] = inferType(value);
+ colTypeIds[i] = getColumnTypeId(value);
}
- ByteBuffer schemaBuff = schemaBuilder(columns, types).build();
- ByteBuffer valueBuff = valueBuilder(columns, types, values).build();
+ ByteBuffer schemaBuff = schemaBuilder(columns, colTypeIds).build();
+ ByteBuffer valueBuff = valueBuilder(columns, values).build();
int schemaBuffLen = schemaBuff.remaining();
int valueBuffLen = valueBuff.remaining();
@@ -150,37 +152,40 @@ public final class TupleWithSchemaMarshalling {
String colName = schemaReader.stringValue(i * 2);
int colTypeId = schemaReader.intValue(i * 2 + 1);
- setColumnValue(valueReader, tup, colName,
ColumnType.getById(colTypeId), i);
+ setColumnValue(valueReader, tup, colName, colTypeId, i);
}
return tup;
}
- private static BinaryTupleBuilder schemaBuilder(String[] columns,
ColumnType[] types) {
+ private static BinaryTupleBuilder schemaBuilder(String[] columns, int[]
colTypeIds) {
BinaryTupleBuilder builder = new BinaryTupleBuilder(columns.length *
2);
for (int i = 0; i < columns.length; i++) {
builder.appendString(columns[i]);
- builder.appendInt(types[i].id());
+ builder.appendInt(colTypeIds[i]);
}
return builder;
}
- private static BinaryTupleBuilder valueBuilder(String[] columnNames,
ColumnType[] types, Object[] values) {
+ private static BinaryTupleBuilder valueBuilder(String[] columnNames,
Object[] values) {
BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length);
for (int i = 0; i < values.length; i++) {
- ColumnType type = types[i];
- Object v = values[i];
-
- append(type, columnNames[i], builder, v);
+ append(columnNames[i], builder, values[i]);
}
return builder;
}
- private static void append(ColumnType type, String name,
BinaryTupleBuilder builder, Object value) {
+ private static void append(String name, BinaryTupleBuilder builder, Object
value) {
+ if (value instanceof Tuple) {
+ builder.appendBytes(marshal((Tuple) value));
+ return;
+ }
+
+ ColumnType type = inferType(value);
try {
switch (type) {
case NULL:
@@ -250,6 +255,14 @@ public final class TupleWithSchemaMarshalling {
}
}
+ private static int getColumnTypeId(@Nullable Object value) {
+ if (value instanceof Tuple) {
+ return NESTED_TUPLE_FLAG;
+ } else {
+ return inferType(value).id();
+ }
+ }
+
private static ColumnType inferType(@Nullable Object value) {
if (value == null) {
return ColumnType.NULL;
@@ -309,7 +322,15 @@ public final class TupleWithSchemaMarshalling {
}
- private static void setColumnValue(BinaryTupleReader reader, Tuple tuple,
String colName, ColumnType colType, int i) {
+ private static void setColumnValue(BinaryTupleReader reader, Tuple tuple,
String colName, int colTypeId, int i) {
+ if (colTypeId == NESTED_TUPLE_FLAG) {
+ byte[] nestedTupleBytes = reader.bytesValue(i);
+ Tuple nestedTuple = unmarshal(nestedTupleBytes);
+ tuple.set(colName, nestedTuple);
+ return;
+ }
+
+ ColumnType colType = ColumnType.getById(colTypeId);
switch (colType) {
case NULL:
tuple.set(colName, null);
diff --git
a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
index 687ada6bd1..6c448a5a1f 100644
---
a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
+++
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
@@ -57,7 +57,8 @@ class TupleWithSchemaMarshallingTest {
Tuple.create().set("col14", "string"),
Tuple.create().set("col15", new byte[]{1, 2, 3}),
Tuple.create().set("col16", Period.ofDays(10)),
- Tuple.create().set("col17", Duration.ofDays(10))
+ Tuple.create().set("col17", Duration.ofDays(10)),
+ Tuple.create().set("col18", Tuple.create().set("col1", 1))
).map(Arguments::of);
}
@@ -67,8 +68,7 @@ class TupleWithSchemaMarshallingTest {
Tuple.create().set("col1", 1).set("col2", new Object()),
Tuple.create().set("col", new ArrayList<>()),
Tuple.create().set("col", new HashMap<>()),
- Tuple.create().set("col", new HashMap<>()),
- Tuple.create().set("col", Tuple.create())
+ Tuple.create().set("col", new HashMap<>())
).map(Arguments::of);
}
@@ -106,7 +106,8 @@ class TupleWithSchemaMarshallingTest {
.set("col14", "string")
.set("col15", new byte[]{1, 2, 3})
.set("col16", Period.ofDays(10))
- .set("col17", Duration.ofDays(10));
+ .set("col17", Duration.ofDays(10))
+ .set("col18", Tuple.create().set("col1", 1));
byte[] marshalled = TupleWithSchemaMarshalling.marshal(tuple);
assertEquals(tuple, TupleWithSchemaMarshalling.unmarshal(marshalled));
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 6e7c815b22..d8c09ecf63 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -48,9 +48,11 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
@@ -489,6 +491,41 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
}
+ /**
+ * Tests that the nested tuples are correctly serialized and deserialized.
+ */
+ @Test
+ void nestedTuplesArgumentSerialization() {
+ Ignite entryNode = node(0);
+ String address = "127.0.0.1:" +
unwrapIgniteImpl(entryNode).clientAddress().port();
+ try (IgniteClient client =
IgniteClient.builder().addresses(address).build()) {
+ var argument = Tuple.create(
+ Map.of("level1_key1", Tuple.create(
+ Map.of("level2_key1", Tuple.create(
+ Map.of("level3_key1",
"level3_value1"))
+ )
+ ),
+ "level1_key2", Tuple.create(
+ Map.of("level2_key1", Tuple.create(
+ Map.of("level3_key1",
"level3_value1"))
+ )
+ ),
+ "level1_key3", "Non-tuple-string-value",
+ "level1_key4", 42
+ )
+ );
+
+ Tuple resultTuple = client.compute().execute(
+ JobTarget.node(clusterNode(node(1))),
+ JobDescriptor.builder(TupleComputeJob.class)
+ .build(),
+ argument
+ );
+
+ assertThat(resultTuple, equalTo(argument));
+ }
+ }
+
static Ignite node(int i) {
return CLUSTER.node(i);
}
@@ -509,6 +546,13 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
return PojoJob.class.getName();
}
+ private static class TupleComputeJob implements ComputeJob<Tuple, Tuple> {
+
+ @Override
+ public @Nullable CompletableFuture<Tuple>
executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
+ return CompletableFuture.completedFuture(arg);
+ }
+ }
static Class<PojoJob> pojoJobClass() {
return PojoJob.class;