This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fc263e3ec IGNITE-23092 Compute: allow nested tuples in arguments and 
results (#4456)
6fc263e3ec is described below

commit 6fc263e3ec39882baf5c9881725e790617730b59
Author: Maksim Myskov <[email protected]>
AuthorDate: Wed Oct 2 12:38:29 2024 +0300

    IGNITE-23092 Compute: allow nested tuples in arguments and results (#4456)
    
    In addition to existing Tuple support as Compute API arguments and results, 
allow nested Tuples (Tuple in Tuple).
---
 .../inlineschema/TupleWithSchemaMarshalling.java   | 49 +++++++++++++++-------
 .../TupleWithSchemaMarshallingTest.java            |  9 ++--
 .../ignite/internal/compute/ItComputeBaseTest.java | 44 +++++++++++++++++++
 3 files changed, 84 insertions(+), 18 deletions(-)

diff --git 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
index ace65ac1e1..ce28666a2c 100644
--- 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
+++ 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
@@ -43,6 +43,8 @@ import org.jetbrains.annotations.Nullable;
 public final class TupleWithSchemaMarshalling {
     private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
 
+    private static final int NESTED_TUPLE_FLAG = -1;
+
     /**
      * Marshal tuple in the following format (LITTLE_ENDIAN).
      *
@@ -68,18 +70,18 @@ public final class TupleWithSchemaMarshalling {
         int size = tuple.columnCount();
         Object[] values = new Object[size];
         String[] columns = new String[size];
-        ColumnType[] types = new ColumnType[size];
+        int[] colTypeIds = new int[size];
 
         // Fill in the values, column names, and types.
         for (int i = 0; i < size; i++) {
             var value = tuple.value(i);
             values[i] = value;
             columns[i] = tuple.columnName(i);
-            types[i] = inferType(value);
+            colTypeIds[i] = getColumnTypeId(value);
         }
 
-        ByteBuffer schemaBuff = schemaBuilder(columns, types).build();
-        ByteBuffer valueBuff = valueBuilder(columns, types, values).build();
+        ByteBuffer schemaBuff = schemaBuilder(columns, colTypeIds).build();
+        ByteBuffer valueBuff = valueBuilder(columns, values).build();
 
         int schemaBuffLen = schemaBuff.remaining();
         int valueBuffLen = valueBuff.remaining();
@@ -150,37 +152,40 @@ public final class TupleWithSchemaMarshalling {
             String colName = schemaReader.stringValue(i * 2);
             int colTypeId = schemaReader.intValue(i * 2 + 1);
 
-            setColumnValue(valueReader, tup, colName, 
ColumnType.getById(colTypeId), i);
+            setColumnValue(valueReader, tup, colName, colTypeId, i);
         }
 
         return tup;
     }
 
-    private static BinaryTupleBuilder schemaBuilder(String[] columns, 
ColumnType[] types) {
+    private static BinaryTupleBuilder schemaBuilder(String[] columns, int[] 
colTypeIds) {
         BinaryTupleBuilder builder = new BinaryTupleBuilder(columns.length * 
2);
 
         for (int i = 0; i < columns.length; i++) {
             builder.appendString(columns[i]);
-            builder.appendInt(types[i].id());
+            builder.appendInt(colTypeIds[i]);
         }
 
         return builder;
     }
 
-    private static BinaryTupleBuilder valueBuilder(String[] columnNames, 
ColumnType[] types, Object[] values) {
+    private static BinaryTupleBuilder valueBuilder(String[] columnNames, 
Object[] values) {
         BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length);
 
         for (int i = 0; i < values.length; i++) {
-            ColumnType type = types[i];
-            Object v = values[i];
-
-            append(type, columnNames[i], builder, v);
+            append(columnNames[i], builder, values[i]);
         }
 
         return builder;
     }
 
-    private static void append(ColumnType type, String name, 
BinaryTupleBuilder builder, Object value) {
+    private static void append(String name, BinaryTupleBuilder builder, Object 
value) {
+        if (value instanceof Tuple) {
+            builder.appendBytes(marshal((Tuple) value));
+            return;
+        }
+
+        ColumnType type = inferType(value);
         try {
             switch (type) {
                 case NULL:
@@ -250,6 +255,14 @@ public final class TupleWithSchemaMarshalling {
         }
     }
 
+    private static int getColumnTypeId(@Nullable Object value) {
+        if (value instanceof Tuple) {
+            return NESTED_TUPLE_FLAG;
+        } else {
+            return inferType(value).id();
+        }
+    }
+
     private static ColumnType inferType(@Nullable Object value) {
         if (value == null) {
             return ColumnType.NULL;
@@ -309,7 +322,15 @@ public final class TupleWithSchemaMarshalling {
     }
 
 
-    private static void setColumnValue(BinaryTupleReader reader, Tuple tuple, 
String colName, ColumnType colType, int i) {
+    private static void setColumnValue(BinaryTupleReader reader, Tuple tuple, 
String colName, int colTypeId, int i) {
+        if (colTypeId == NESTED_TUPLE_FLAG) {
+            byte[] nestedTupleBytes = reader.bytesValue(i);
+            Tuple nestedTuple = unmarshal(nestedTupleBytes);
+            tuple.set(colName, nestedTuple);
+            return;
+        }
+
+        ColumnType colType = ColumnType.getById(colTypeId);
         switch (colType) {
             case NULL:
                 tuple.set(colName, null);
diff --git 
a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
 
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
index 687ada6bd1..6c448a5a1f 100644
--- 
a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
+++ 
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
@@ -57,7 +57,8 @@ class TupleWithSchemaMarshallingTest {
                 Tuple.create().set("col14", "string"),
                 Tuple.create().set("col15", new byte[]{1, 2, 3}),
                 Tuple.create().set("col16", Period.ofDays(10)),
-                Tuple.create().set("col17", Duration.ofDays(10))
+                Tuple.create().set("col17", Duration.ofDays(10)),
+                Tuple.create().set("col18", Tuple.create().set("col1", 1))
         ).map(Arguments::of);
     }
 
@@ -67,8 +68,7 @@ class TupleWithSchemaMarshallingTest {
                 Tuple.create().set("col1", 1).set("col2", new Object()),
                 Tuple.create().set("col", new ArrayList<>()),
                 Tuple.create().set("col", new HashMap<>()),
-                Tuple.create().set("col", new HashMap<>()),
-                Tuple.create().set("col", Tuple.create())
+                Tuple.create().set("col", new HashMap<>())
         ).map(Arguments::of);
     }
 
@@ -106,7 +106,8 @@ class TupleWithSchemaMarshallingTest {
                 .set("col14", "string")
                 .set("col15", new byte[]{1, 2, 3})
                 .set("col16", Period.ofDays(10))
-                .set("col17", Duration.ofDays(10));
+                .set("col17", Duration.ofDays(10))
+                .set("col18", Tuple.create().set("col1", 1));
 
         byte[] marshalled = TupleWithSchemaMarshalling.marshal(tuple);
         assertEquals(tuple, TupleWithSchemaMarshalling.unmarshal(marshalled));
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 6e7c815b22..d8c09ecf63 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -48,9 +48,11 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
@@ -489,6 +491,41 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
 
     }
 
+    /**
+     * Tests that the nested tuples are correctly serialized and deserialized.
+     */
+    @Test
+    void nestedTuplesArgumentSerialization() {
+        Ignite entryNode = node(0);
+        String address = "127.0.0.1:" + 
unwrapIgniteImpl(entryNode).clientAddress().port();
+        try (IgniteClient client = 
IgniteClient.builder().addresses(address).build()) {
+            var argument = Tuple.create(
+                    Map.of("level1_key1", Tuple.create(
+                                    Map.of("level2_key1", Tuple.create(
+                                            Map.of("level3_key1", 
"level3_value1"))
+                                    )
+                            ),
+                            "level1_key2", Tuple.create(
+                                    Map.of("level2_key1", Tuple.create(
+                                            Map.of("level3_key1", 
"level3_value1"))
+                                    )
+                            ),
+                            "level1_key3", "Non-tuple-string-value",
+                            "level1_key4", 42
+                    )
+            );
+
+            Tuple resultTuple = client.compute().execute(
+                    JobTarget.node(clusterNode(node(1))),
+                    JobDescriptor.builder(TupleComputeJob.class)
+                            .build(),
+                    argument
+            );
+
+            assertThat(resultTuple, equalTo(argument));
+        }
+    }
+
     static Ignite node(int i) {
         return CLUSTER.node(i);
     }
@@ -509,6 +546,13 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         return PojoJob.class.getName();
     }
 
+    private static class TupleComputeJob implements ComputeJob<Tuple, Tuple> {
+
+        @Override
+        public @Nullable CompletableFuture<Tuple> 
executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
+            return CompletableFuture.completedFuture(arg);
+        }
+    }
 
     static Class<PojoJob> pojoJobClass() {
         return PojoJob.class;

Reply via email to