This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fcf2fade6a IGNITE-22643 Support Tuple with schema in Compute API
(#4142)
fcf2fade6a is described below
commit fcf2fade6a4b7630b96364e96feea07e0e4dd750
Author: Aleksandr Pakhomov <[email protected]>
AuthorDate: Thu Aug 22 18:29:39 2024 +0300
IGNITE-22643 Support Tuple with schema in Compute API (#4142)
To support Tuples in compute API I've refactored some parts:
- ClientBinaryTupleUtils does not accept Marshaller when append the object.
- ClientComputeJobPacker and ClientComputeJobUnpacker are introduced.
They are supposed to encapsulate all logic about object marshalling in
Compute.
---
gradle/libs.versions.toml | 1 +
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../org/apache/ignite/marshalling/Marshaller.java | 60 +++-
...gException.java => UnmarshallingException.java} | 22 +-
.../UnsupportedObjectTypeMarshallingException.java | 13 +-
.../marshalling/ByteArrayMarshallerTest.java | 5 +-
modules/binary-tuple/build.gradle | 5 +-
.../inlineschema/TupleWithSchemaMarshalling.java | 373 +++++++++++++++++++++
.../TupleWithSchemaMarshallingTest.java | 132 ++++++++
.../client/proto/ClientBinaryTupleUtils.java | 6 +-
.../client/proto/ClientComputeJobPacker.java | 91 +++++
.../client/proto/ClientComputeJobUnpacker.java | 107 ++++++
.../internal/client/proto/ClientMessagePacker.java | 28 +-
.../internal/client/proto/ComputeJobType.java | 73 ++++
.../client/proto/StreamerReceiverSerializer.java | 4 +-
.../proto/event/JdbcBatchPreparedStmntRequest.java | 2 +-
.../jdbc/proto/event/JdbcQueryExecuteRequest.java | 2 +-
.../proto/ClientComputeJobPackerUnpackerTest.java | 247 ++++++++++++++
.../proto/ClientMessagePackerUnpackerTest.java | 8 +-
.../internal/client/proto/ComputeJobTypeTest.java | 60 ++++
.../ClientComputeExecuteColocatedRequest.java | 2 -
.../ClientComputeExecuteMapReduceRequest.java | 2 +-
.../compute/ClientComputeExecuteRequest.java | 22 +-
.../internal/client/compute/ClientCompute.java | 5 +-
.../client/compute/ClientJobExecution.java | 7 +-
.../ignite/internal/client/sql/ClientSql.java | 6 +-
.../apache/ignite/client/fakes/FakeCompute.java | 59 ++--
.../internal/compute/DelegatingJobExecution.java | 13 +-
.../ignite/internal/compute/ExecutionManager.java | 7 +
.../internal/compute/FailSafeJobExecution.java | 13 +-
.../ignite/internal/compute/IgniteComputeImpl.java | 78 ++---
.../internal/compute/IgniteComputeInternal.java | 11 +-
.../internal/compute/JobExecutionWrapper.java | 12 +-
.../internal/compute/MarshallerProvider.java} | 20 +-
.../compute/ResultMarshallingJobExecution.java | 18 +-
...n.java => ResultUnmarshallingJobExecution.java} | 22 +-
.../compute/executor/ComputeExecutorImpl.java | 24 +-
.../compute/executor/JobExecutionInternal.java | 14 +-
.../ignite/client/detail/compute/compute_impl.cpp | 71 +++-
modules/platforms/cpp/ignite/common/error_codes.h | 3 +-
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 2 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../Apache.Ignite/Internal/Compute/Compute.cs | 6 +-
.../Internal/Compute/ComputePacker.cs | 122 +++++++
.../Internal/Proto/MsgPack/MsgPackReader.cs | 34 --
.../Internal/Proto/MsgPack/MsgPackWriter.cs | 50 ++-
.../client/ItThinClientComputeMarshallingTest.java | 38 ++-
...tThinClientComputeTypeCheckMarshallingTest.java | 7 +-
.../ItThinClientTupleComputeMarshallingTest.java | 191 +++++++++++
.../app/compute/ItEmbeddedMarshallingTest.java | 67 +++-
51 files changed, 1881 insertions(+), 291 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index a66d0b66ae..03ceb99766 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -168,6 +168,7 @@ jansi-core = { module = "org.fusesource.jansi:jansi",
version.ref = "jansi" }
jackson-core = { module = "com.fasterxml.jackson.core:jackson-core",
version.ref = "jackson" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind",
version.ref = "jackson" }
jackson-annotations = { module =
"com.fasterxml.jackson.core:jackson-annotations", version.ref = "jackson" }
+jackson-datatype-jsr310 = { module =
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref =
"jackson" }
typesafe-config = { module = "com.typesafe:config", version.ref = "typesafe" }
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index c0eb3e42f8..4a173d618b 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -671,5 +671,8 @@ public class ErrorGroups {
/** Unsupported object type error. */
public static final int UNSUPPORTED_OBJECT_TYPE_ERR =
MARSHALLING_ERR_GROUP.registerErrorCode((short) 2);
+
+ /** There format of bytes is wrong. */
+ public static final int UNMARSHALLING_ERR =
MARSHALLING_ERR_GROUP.registerErrorCode((short) 3);
}
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
b/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
index 6f802f8740..3a9b65cef2 100644
--- a/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
+++ b/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
@@ -21,37 +21,75 @@ package org.apache.ignite.marshalling;
import org.jetbrains.annotations.Nullable;
/**
- * Object marshaller interface that is used in every Ignite API that requires
- * serialization/deserialization of user objects. If you want to define the way
- * your objects are serialized/deserialized, you can implement this interface
- * and pass it to the API that requires it.
+ * Object marshaller interface that is used in every Ignite API that requires
serialization/deserialization of user objects. If you want to
+ * define the way your objects are serialized/deserialized, you can implement
this interface and pass it to the API that requires it.
*
* <p>NOTE: The marshaller itself are not sent over the wire. This means that
if you
- * define a custom marshaller on the client, you must also define the
marshaller
- * on the server as well.
+ * define a custom marshaller on the client, you must also define the
marshaller on the server as well.
*
* @param <T> The object (T)ype.
- * @param <R> The (R)aw type, for example, {@code byte[]} or {@link
org.apache.ignite.table.Tuple}.
+ * @param <R> The (R)aw type, for example, {@code byte[]}.
*/
public interface Marshaller<T, R> {
/**
* Marshal the object into raw type.
*
* @param object object to marshal.
- *
* @return raw presentation of object.
* @throws UnsupportedObjectTypeMarshallingException if the given type can
not be marshalled with current instance.
*/
- @Nullable R marshal(@Nullable T object) throws
UnsupportedObjectTypeMarshallingException;
+ @Nullable
+ R marshal(@Nullable T object) throws
UnsupportedObjectTypeMarshallingException;
/**
* Unmarshal the raw type into object.
*
* @param raw raw presentation of object.
- *
* @return object.
* @throws UnsupportedObjectTypeMarshallingException if the given type can
not be unmarshalled with current instance.
*/
- @Nullable T unmarshal(@Nullable R raw) throws
UnsupportedObjectTypeMarshallingException;
+ @Nullable
+ T unmarshal(@Nullable R raw) throws
UnsupportedObjectTypeMarshallingException;
+
+ /**
+ * Try to marshal given object if marshaller if not null, else the object
is casted directly to the target type.
+ *
+ * @param self the marshaller instance.
+ * @param object to marshal.
+ * @param <T> The object (T)ype.
+ * @param <R> The (R)aw type, for example.
+ */
+ static <T, R> @Nullable R tryMarshalOrCast(@Nullable Marshaller<T, R>
self, @Nullable Object object) {
+ if (self != null) {
+ try {
+ T castedObj = (T) object;
+ return self.marshal(castedObj);
+ } catch (ClassCastException ignored) {
+ // ignore.
+ }
+ }
+
+ return (R) object;
+ }
+
+ /**
+ * Try to unmarshal given object if marshaller if not null, else the
object is casted directly to the target type.
+ *
+ * @param self the marshaller instance.
+ * @param <T> The object (T)ype.
+ * @param <R> The (R)aw type, for example, {@code byte[]}.
+ */
+ static <T, R> @Nullable T tryUnmarshalOrCast(@Nullable Marshaller<T, R>
self, @Nullable Object raw) {
+ if (self != null) {
+ try {
+ R rawType = (R) raw;
+ return self.unmarshal(rawType);
+ } catch (ClassCastException ignored) {
+ // ignore.
+ }
+ }
+
+ return (T) raw;
+ }
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnmarshallingException.java
similarity index 66%
copy from
modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
copy to
modules/api/src/main/java/org/apache/ignite/marshalling/UnmarshallingException.java
index bb8b4ba636..5edd341007 100644
---
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
+++
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnmarshallingException.java
@@ -21,22 +21,14 @@ import java.util.UUID;
import org.apache.ignite.lang.ErrorGroups.Marshalling;
import org.apache.ignite.lang.IgniteException;
-/**
- * Exception thrown when an object type is not supported by the marshaller.
- */
-public class UnsupportedObjectTypeMarshallingException extends IgniteException
{
+/** Exception thrown when unmarshalling failed. */
+public class UnmarshallingException extends IgniteException {
private static final long serialVersionUID = -8131613381875542450L;
- /**
- * Creates an exception with the given unsupported type.
- *
- * @param unsupportedType Unsupported type.
- */
- public UnsupportedObjectTypeMarshallingException(Class<?> unsupportedType)
{
- super(
- Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR,
- "Unsupported object type: " + unsupportedType.getName() + ".
Please, define the marshaller that can handle this type."
- );
+
+ /** Constructor. */
+ public UnmarshallingException(String msg) {
+ super(Marshalling.UNMARSHALLING_ERR, msg);
}
/**
@@ -47,7 +39,7 @@ public class UnsupportedObjectTypeMarshallingException
extends IgniteException {
* @param message Detailed message.
* @param cause Optional nested exception (can be {@code null}).
*/
- public UnsupportedObjectTypeMarshallingException(UUID traceId, int code,
String message, Throwable cause) {
+ public UnmarshallingException(UUID traceId, int code, String message,
Throwable cause) {
super(traceId, code, message, cause);
}
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
index bb8b4ba636..90ab7b4fa2 100644
---
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
+++
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
@@ -27,16 +27,21 @@ import org.apache.ignite.lang.IgniteException;
public class UnsupportedObjectTypeMarshallingException extends IgniteException
{
private static final long serialVersionUID = -8131613381875542450L;
+
/**
* Creates an exception with the given unsupported type.
*
* @param unsupportedType Unsupported type.
*/
public UnsupportedObjectTypeMarshallingException(Class<?> unsupportedType)
{
- super(
- Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR,
- "Unsupported object type: " + unsupportedType.getName() + ".
Please, define the marshaller that can handle this type."
- );
+ this("Unsupported object type: " + unsupportedType.getName() + ".
Please, define a marshaller that can handle this type.");
+ }
+
+ /**
+ * Creates an exception with the given unsupported type.
+ */
+ public UnsupportedObjectTypeMarshallingException(String msg) {
+ super(Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR, msg);
}
/**
diff --git
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
index fab85e3a17..dbdd9f7ec5 100644
---
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
+++
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
@@ -64,8 +64,7 @@ class ByteArrayMarshallerTest {
private static Stream<Arguments> cornerValues() {
return Stream.of(
"", ".*456+576+$%^&*()_+{}|:<>?`~", "1", "0",
- -1, 0,
- new Object()
+ -1, 0
).map(Arguments::of);
}
@@ -86,7 +85,9 @@ class ByteArrayMarshallerTest {
@ParameterizedTest
@MethodSource("cornerValues")
void cornerValuesMarshalling(Object obj) {
+ ByteArrayMarshaller<Object> marshaller = ByteArrayMarshaller.create();
+ assertEquals(obj, marshaller.unmarshal(marshaller.marshal(obj)));
}
diff --git a/modules/binary-tuple/build.gradle
b/modules/binary-tuple/build.gradle
index 8200416793..21a695a331 100644
--- a/modules/binary-tuple/build.gradle
+++ b/modules/binary-tuple/build.gradle
@@ -26,5 +26,8 @@ dependencies {
implementation project(':ignite-api')
implementation project(':ignite-core')
implementation libs.jetbrains.annotations
-}
+ testImplementation libs.jackson.core
+ testImplementation libs.jackson.databind
+ testImplementation libs.jackson.datatype.jsr310
+}
diff --git
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
new file mode 100644
index 0000000000..ace65ac1e1
--- /dev/null
+++
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binarytuple.inlineschema;
+
+
+import static
org.apache.ignite.lang.ErrorGroups.Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
+import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/** Tuple with schema marshalling. */
+public final class TupleWithSchemaMarshalling {
+ private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+
+ /**
+ * Marshal tuple in the following format (LITTLE_ENDIAN).
+ *
+ * <pre>
+ * marshalledTuple := | size | valuePosition | binaryTupleWithSchema |
+ *
+ * size := int32
+ * valuePosition := int32
+ * binaryTupleWithSchema := | schemaBinaryTuple | valueBinaryTuple |
+ * schemaBinaryTuple := | column1 | ... | columnN |
+ * column := | columnName | columnType |
+ * columnName := string
+ * columnType := int32
+ * valueBinaryTuple := | value1 | ... | valueN |.
+ * </pre>
+ */
+ public static byte @Nullable [] marshal(@Nullable Tuple tuple) {
+ if (tuple == null) {
+ return null;
+ }
+
+ // Allocate all the memory we need upfront.
+ int size = tuple.columnCount();
+ Object[] values = new Object[size];
+ String[] columns = new String[size];
+ ColumnType[] types = new ColumnType[size];
+
+ // Fill in the values, column names, and types.
+ for (int i = 0; i < size; i++) {
+ var value = tuple.value(i);
+ values[i] = value;
+ columns[i] = tuple.columnName(i);
+ types[i] = inferType(value);
+ }
+
+ ByteBuffer schemaBuff = schemaBuilder(columns, types).build();
+ ByteBuffer valueBuff = valueBuilder(columns, types, values).build();
+
+ int schemaBuffLen = schemaBuff.remaining();
+ int valueBuffLen = valueBuff.remaining();
+
+ // Size: int32 (tuple size), int32 (value offset), schema, value.
+ byte[] result = new byte[4 + 4 + schemaBuffLen + valueBuffLen];
+ ByteBuffer buff = ByteBuffer.wrap(result).order(BYTE_ORDER);
+
+ // Put the size of the schema in the first 4 bytes.
+ buff.putInt(size);
+
+ // Put the value offset in the second 4 bytes.
+ int offset = schemaBuffLen + 8;
+
+ buff.putInt(offset);
+
+ buff.put(schemaBuff);
+ buff.put(valueBuff);
+
+ return result;
+ }
+
+ /**
+ * Unmarshal tuple (LITTLE_ENDIAN).
+ *
+ * @param raw byte[] bytes that are marshaled by {@link #marshal(Tuple)}.
+ */
+ public static @Nullable Tuple unmarshal(byte @Nullable [] raw) {
+ if (raw == null) {
+ return null;
+ }
+ if (raw.length < 8) {
+ throw new UnmarshallingException("byte[] length can not be less
than 8");
+ }
+
+ // Read first int32.
+ ByteBuffer buff = ByteBuffer.wrap(raw).order(BYTE_ORDER);
+ int size = buff.getInt(0);
+ if (size < 0) {
+ throw new UnmarshallingException("Size of the tuple can not be
less than zero");
+ }
+
+ // Read second int32.
+ int valueOffset = buff.getInt(4);
+ if (valueOffset < 0) {
+ throw new UnmarshallingException("valueOffset can not be less than
zero");
+ }
+ if (valueOffset > raw.length) {
+ throw new UnmarshallingException(
+ "valueOffset can not be greater than byte[] length,
valueOffset: "
+ + valueOffset + ", length: " + raw.length
+ );
+ }
+
+ ByteBuffer schemaBuff = buff
+ .position(8).limit(valueOffset)
+ .slice().order(BYTE_ORDER);
+ ByteBuffer valueBuff = buff
+ .position(valueOffset).limit(raw.length)
+ .slice().order(BYTE_ORDER);
+
+ BinaryTupleReader schemaReader = new BinaryTupleReader(size * 2,
schemaBuff);
+ BinaryTupleReader valueReader = new BinaryTupleReader(size, valueBuff);
+
+ Tuple tup = Tuple.create(size);
+
+ for (int i = 0; i < size; i++) {
+ String colName = schemaReader.stringValue(i * 2);
+ int colTypeId = schemaReader.intValue(i * 2 + 1);
+
+ setColumnValue(valueReader, tup, colName,
ColumnType.getById(colTypeId), i);
+ }
+
+ return tup;
+ }
+
+ private static BinaryTupleBuilder schemaBuilder(String[] columns,
ColumnType[] types) {
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(columns.length *
2);
+
+ for (int i = 0; i < columns.length; i++) {
+ builder.appendString(columns[i]);
+ builder.appendInt(types[i].id());
+ }
+
+ return builder;
+ }
+
+ private static BinaryTupleBuilder valueBuilder(String[] columnNames,
ColumnType[] types, Object[] values) {
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length);
+
+ for (int i = 0; i < values.length; i++) {
+ ColumnType type = types[i];
+ Object v = values[i];
+
+ append(type, columnNames[i], builder, v);
+ }
+
+ return builder;
+ }
+
+ private static void append(ColumnType type, String name,
BinaryTupleBuilder builder, Object value) {
+ try {
+ switch (type) {
+ case NULL:
+ builder.appendNull();
+ return;
+ case BOOLEAN:
+ builder.appendBoolean((Boolean) value);
+ return;
+ case INT8:
+ builder.appendByte((Byte) value);
+ return;
+ case INT16:
+ builder.appendShort((Short) value);
+ return;
+ case INT32:
+ builder.appendInt((Integer) value);
+ return;
+ case INT64:
+ builder.appendLong((Long) value);
+ return;
+ case FLOAT:
+ builder.appendFloat((Float) value);
+ return;
+ case DOUBLE:
+ builder.appendDouble((Double) value);
+ return;
+ case STRING:
+ builder.appendString((String) value);
+ return;
+ case DECIMAL:
+ BigDecimal d = (BigDecimal) value;
+ builder.appendDecimal(d, d.scale());
+ return;
+ case DATE:
+ builder.appendDate((LocalDate) value);
+ return;
+ case TIME:
+ builder.appendTime((LocalTime) value);
+ return;
+ case DATETIME:
+ builder.appendDateTime((LocalDateTime) value);
+ return;
+ case TIMESTAMP:
+ builder.appendTimestamp((Instant) value);
+ return;
+ case UUID:
+ builder.appendUuid((UUID) value);
+ return;
+ case BYTE_ARRAY:
+ builder.appendBytes((byte[]) value);
+ return;
+ case PERIOD:
+ builder.appendPeriod((Period) value);
+ return;
+ case DURATION:
+ builder.appendDuration((Duration) value);
+ return;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
type);
+ }
+ } catch (ClassCastException e) {
+ throw new IgniteException(UNSUPPORTED_OBJECT_TYPE_ERR, "Column's
type mismatch ["
+ + "column=" + name
+ + ", expectedType=" + type.javaClass(),
+ e
+ );
+ }
+ }
+
+ private static ColumnType inferType(@Nullable Object value) {
+ if (value == null) {
+ return ColumnType.NULL;
+ }
+ if (value instanceof Boolean) {
+ return ColumnType.BOOLEAN;
+ }
+ if (value instanceof Byte) {
+ return ColumnType.INT8;
+ }
+ if (value instanceof Short) {
+ return ColumnType.INT16;
+ }
+ if (value instanceof Integer) {
+ return ColumnType.INT32;
+ }
+ if (value instanceof Long) {
+ return ColumnType.INT64;
+ }
+ if (value instanceof Float) {
+ return ColumnType.FLOAT;
+ }
+ if (value instanceof Double) {
+ return ColumnType.DOUBLE;
+ }
+ if (value instanceof String) {
+ return ColumnType.STRING;
+ }
+ if (value instanceof BigDecimal) {
+ return ColumnType.DECIMAL;
+ }
+ if (value instanceof LocalDate) {
+ return ColumnType.DATE;
+ }
+ if (value instanceof LocalTime) {
+ return ColumnType.TIME;
+ }
+ if (value instanceof LocalDateTime) {
+ return ColumnType.DATETIME;
+ }
+ if (value instanceof Instant) {
+ return ColumnType.TIMESTAMP;
+ }
+ if (value instanceof UUID) {
+ return ColumnType.UUID;
+ }
+ if (value instanceof byte[]) {
+ return ColumnType.BYTE_ARRAY;
+ }
+ if (value instanceof Period) {
+ return ColumnType.PERIOD;
+ }
+ if (value instanceof Duration) {
+ return ColumnType.DURATION;
+ }
+ throw new UnsupportedObjectTypeMarshallingException("Tuple field is of
unsupported type: " + value.getClass());
+ }
+
+
+ private static void setColumnValue(BinaryTupleReader reader, Tuple tuple,
String colName, ColumnType colType, int i) {
+ switch (colType) {
+ case NULL:
+ tuple.set(colName, null);
+ break;
+ case BOOLEAN:
+ tuple.set(colName, reader.booleanValue(i));
+ break;
+ case INT8:
+ tuple.set(colName, reader.byteValue(i));
+ break;
+ case INT16:
+ tuple.set(colName, reader.shortValue(i));
+ break;
+ case INT32:
+ tuple.set(colName, reader.intValue(i));
+ break;
+ case INT64:
+ tuple.set(colName, reader.longValue(i));
+ break;
+ case FLOAT:
+ tuple.set(colName, reader.floatValue(i));
+ break;
+ case DOUBLE:
+ tuple.set(colName, reader.doubleValue(i));
+ break;
+ case STRING:
+ tuple.set(colName, reader.stringValue(i));
+ break;
+ case DECIMAL:
+ BigDecimal decimal = reader.decimalValue(i, Integer.MIN_VALUE);
+ tuple.set(colName, decimal);
+ break;
+ case DATE:
+ tuple.set(colName, reader.dateValue(i));
+ break;
+ case TIME:
+ tuple.set(colName, reader.timeValue(i));
+ break;
+ case DATETIME:
+ tuple.set(colName, reader.dateTimeValue(i));
+ break;
+ case TIMESTAMP:
+ tuple.set(colName, reader.timestampValue(i));
+ break;
+ case UUID:
+ tuple.set(colName, reader.uuidValue(i));
+ break;
+ case BYTE_ARRAY:
+ tuple.set(colName, reader.bytesValue(i));
+ break;
+ case PERIOD:
+ tuple.set(colName, reader.periodValue(i));
+ break;
+ case DURATION:
+ tuple.set(colName, reader.durationValue(i));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
colType);
+ }
+ }
+}
diff --git
a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
new file mode 100644
index 0000000000..687ada6bd1
--- /dev/null
+++
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binarytuple.inlineschema;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class TupleWithSchemaMarshallingTest {
+ private static Stream<Arguments> oneFieldTuple() {
+ return Stream.of(
+ Tuple.create().set("col", 1),
+ Tuple.create().set("col2", true),
+ Tuple.create().set("col3", (byte) 1),
+ Tuple.create().set("col4", (short) 2),
+ Tuple.create().set("col5", 3),
+ Tuple.create().set("col6", 4L),
+ Tuple.create().set("col7", 5.0f),
+ Tuple.create().set("col8", 6.0),
+ Tuple.create().set("col9", new BigDecimal("7.1")),
+ Tuple.create().set("col10", LocalDate.of(2024, 1, 1)),
+ Tuple.create().set("col11", LocalTime.of(12, 0)),
+ Tuple.create().set("col12", LocalDate.of(2024, 1,
1).atTime(LocalTime.of(12, 0))),
+ Tuple.create().set("col13",
UUID.fromString("123e4567-e89b-12d3-a456-426614174000")),
+ Tuple.create().set("col14", "string"),
+ Tuple.create().set("col15", new byte[]{1, 2, 3}),
+ Tuple.create().set("col16", Period.ofDays(10)),
+ Tuple.create().set("col17", Duration.ofDays(10))
+ ).map(Arguments::of);
+ }
+
+ private static Stream<Arguments> unsupportedTypes() {
+ return Stream.of(
+ Tuple.create().set("col", new Object()),
+ Tuple.create().set("col1", 1).set("col2", new Object()),
+ Tuple.create().set("col", new ArrayList<>()),
+ Tuple.create().set("col", new HashMap<>()),
+ Tuple.create().set("col", new HashMap<>()),
+ Tuple.create().set("col", Tuple.create())
+ ).map(Arguments::of);
+ }
+
+ private static Stream<Arguments> wrongByteLayout() {
+ return Stream.of(
+ new byte[]{},
+ new byte[]{1},
+ new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}
+ ).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("oneFieldTuple")
+ void serDeOneFieldTuple(Tuple tuple) {
+ byte[] marshalled = TupleWithSchemaMarshalling.marshal(tuple);
+ assertEquals(tuple, TupleWithSchemaMarshalling.unmarshal(marshalled));
+ }
+
+ @Test
+ void allFieldsTupleTest() {
+ Tuple tuple = Tuple.create()
+ .set("col1", null)
+ .set("col2", true)
+ .set("col3", (byte) 1)
+ .set("col4", (short) 2)
+ .set("col5", 3)
+ .set("col6", 4L)
+ .set("col7", 5.0f)
+ .set("col8", 6.0)
+ .set("col9", new BigDecimal("7.11"))
+ .set("col10", LocalDate.of(2024, 1, 1))
+ .set("col11", LocalTime.of(12, 0))
+ .set("col12", LocalDate.of(2024, 1, 1).atTime(LocalTime.of(12,
0)))
+ .set("col13",
UUID.fromString("123e4567-e89b-12d3-a456-426614174000"))
+ .set("col14", "string")
+ .set("col15", new byte[]{1, 2, 3})
+ .set("col16", Period.ofDays(10))
+ .set("col17", Duration.ofDays(10));
+
+ byte[] marshalled = TupleWithSchemaMarshalling.marshal(tuple);
+ assertEquals(tuple, TupleWithSchemaMarshalling.unmarshal(marshalled));
+ }
+
+ @Test
+ void nullTuple() {
+ byte[] marshalled = TupleWithSchemaMarshalling.marshal(null);
+ assertNull(TupleWithSchemaMarshalling.unmarshal(marshalled));
+ }
+
+ @MethodSource("unsupportedTypes")
+ @ParameterizedTest
+ void unsupportedTypesMarshal(Tuple tup) {
+ assertThrows(UnsupportedObjectTypeMarshallingException.class, () ->
TupleWithSchemaMarshalling.marshal(tup));
+ }
+
+ @MethodSource("wrongByteLayout")
+ @ParameterizedTest
+ void unsupportedTypesUnmarshal(byte[] obj) {
+ assertThrows(UnmarshallingException.class, () ->
TupleWithSchemaMarshalling.unmarshal(obj));
+ }
+}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
index abd3582c7e..5b30458977 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;
@@ -185,14 +184,11 @@ public class ClientBinaryTupleUtils {
* @param builder Builder.
* @param obj Object.
*/
- public static <T> void appendObject(BinaryTupleBuilder builder, @Nullable
T obj, @Nullable Marshaller<T, byte[]> marshaller) {
+ public static <T> void appendObject(BinaryTupleBuilder builder, @Nullable
T obj) {
if (obj == null) {
builder.appendNull(); // Type.
builder.appendNull(); // Scale.
builder.appendNull(); // Value.
- } else if (marshaller != null) {
- appendTypeAndScale(builder, ColumnType.BYTE_ARRAY);
- builder.appendBytes(marshaller.marshal(obj));
} else if (obj instanceof Boolean) {
appendTypeAndScale(builder, ColumnType.BOOLEAN);
builder.appendBoolean((Boolean) obj);
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
new file mode 100644
index 0000000000..0636f80c5e
--- /dev/null
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import
org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
+import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/** Packs job arguments and results. */
+public final class ClientComputeJobPacker {
+ /**
+ * Packs compute job argument. If the marshaller is provided, it will be
used to marshal the argument. If the marshaller is not provided
+ * and the argument is a native column type or a tuple, it will be packed
accordingly.
+ *
+ * @param arg Argument.
+ * @param marshaller Marshaller.
+ * @param packer Packer.
+ * @param <T> Argument type.
+ */
+ public static <T> void packJobArgument(@Nullable T arg, @Nullable
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
+ pack(arg, marshaller, packer);
+ }
+
+ /**
+ * Packs compute job result. If the marshaller is provided, it will be
used to marshal the result. If the marshaller is not provided and
+ * the result is a native column type or a tuple, it will be packed
accordingly.
+ *
+ * @param res Result.
+ * @param marshaller Marshaller.
+ * @param packer Packer.
+ * @param <T> Result type.
+ */
+ public static <T> void packJobResult(@Nullable T res, @Nullable
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
+ pack(res, marshaller, packer);
+ }
+
+ /** Packs object in the format: | typeId | value |. */
+ private static <T> void pack(@Nullable T obj, @Nullable Marshaller<T,
byte[]> marshaller, ClientMessagePacker packer) {
+ if (marshaller != null) {
+ packer.packInt(ComputeJobType.MARSHALLED_OBJECT_ID);
+ byte[] marshalled = marshaller.marshal(obj);
+
+ if (marshalled == null) {
+ packer.packNil();
+ return;
+ }
+
+ packer.packBinary(marshalled);
+ return;
+ }
+
+ if (obj instanceof Tuple) {
+ byte[] marshalledTuple =
TupleWithSchemaMarshalling.marshal((Tuple) obj);
+
+ packer.packInt(ComputeJobType.MARSHALLED_TUPLE_ID);
+
+ if (marshalledTuple == null) {
+ packer.packNil();
+ return;
+ }
+
+ packer.packBinary(marshalledTuple);
+ return;
+ }
+
+ if (obj == null) {
+ packer.packNil();
+ return;
+ }
+
+ packer.packInt(ComputeJobType.NATIVE_ID);
+
+ packer.packObjectAsBinaryTuple(obj);
+ }
+}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
new file mode 100644
index 0000000000..0501ce9d29
--- /dev/null
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;
+
+import
org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
+import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Unpacks job arguments and results. */
+public final class ClientComputeJobUnpacker {
+ /**
+ * Unpacks compute job argument. If the marshaller is provided, it will be
used to unmarshal the argument. If the marshaller is not
+ * provided and the argument is a native column type or a tuple, it will
be unpacked accordingly.
+ *
+ * @param marshaller Marshaller.
+ * @param unpacker Unpacker.
+ * @return Unpacked argument.
+ */
+ public static @Nullable Object unpackJobArgument(@Nullable Marshaller<?,
byte[]> marshaller, ClientMessageUnpacker unpacker) {
+ return unpack(marshaller, unpacker);
+ }
+
+ /**
+ * Unpacks compute job result. If the marshaller is provided, it will be
used to unmarshal the result. If the marshaller is not provided
+ * and the result is a native column type or a tuple, it will be unpacked
accordingly.
+ *
+ * @param marshaller Marshaller.
+ * @param unpacker Unpacker.
+ * @return Unpacked result.
+ */
+ public static @Nullable Object unpackJobResult(@Nullable Marshaller<?,
byte[]> marshaller, ClientMessageUnpacker unpacker) {
+ return unpack(marshaller, unpacker);
+ }
+
+ /** Underlying byte array expected to be in the following format: | typeId
| value |. */
+ private static @Nullable Object unpack(@Nullable Marshaller<?, byte[]>
marshaller, ClientMessageUnpacker unpacker) {
+ if (unpacker.tryUnpackNil()) {
+ return null;
+ }
+
+ int typeId = unpacker.unpackInt();
+ var type = ComputeJobType.Type.fromId(typeId);
+
+ switch (type) {
+ case NATIVE:
+ if (marshaller != null) {
+ throw new UnmarshallingException(
+ "Can not unpack object because the marshaller is
provided but the object was packed without marshaller."
+ );
+ }
+
+ return unpacker.unpackObjectFromBinaryTuple();
+ case MARSHALLED_TUPLE:
+ return
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
+
+ case MARSHALLED_OBJECT:
+ if (marshaller == null) {
+ throw new UnmarshallingException(
+ "Can not unpack object because the marshaller is
not provided but the object was packed with marshaller."
+ );
+ }
+ return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
+
+ default:
+ throw new UnmarshallingException("Unsupported compute job type
id: " + typeId);
+ }
+ }
+
+ /** Unpacks compute job argument without marshaller. */
+ public static Object
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
+ if (unpacker.tryUnpackNil()) {
+ return null;
+ }
+
+ int typeId = unpacker.unpackInt();
+ var type = ComputeJobType.Type.fromId(typeId);
+
+ switch (type) {
+ case NATIVE:
+ return unpacker.unpackObjectFromBinaryTuple();
+ case MARSHALLED_TUPLE:
+ return
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
+ case MARSHALLED_OBJECT:
+ return unpacker.readBinary();
+ default:
+ throw new UnmarshallingException("Unsupported compute job type
id: " + typeId);
+ }
+ }
+}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 3a93bb98e4..deb3be15f2 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -30,13 +30,11 @@ import java.util.UUID;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
-import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.sql.BatchedArguments;
import org.jetbrains.annotations.Nullable;
/**
- * ByteBuf-based MsgPack implementation. Replaces {@link
org.msgpack.core.MessagePacker} to avoid
- * extra buffers and indirection.
+ * ByteBuf-based MsgPack implementation. Replaces {@link
org.msgpack.core.MessagePacker} to avoid extra buffers and indirection.
*
* <p>Releases wrapped buffer on {@link #close()} .
*/
@@ -379,7 +377,7 @@ public class ClientMessagePacker implements AutoCloseable {
*
* <p>Should be followed by {@link #writePayload(byte[])} method to write
the extension body.
*
- * @param extType the extension type tag to be written.
+ * @param extType the extension type tag to be written.
* @param payloadLen number of bytes of a payload binary to be written.
*/
public void packExtensionTypeHeader(byte extType, int payloadLen) {
@@ -599,7 +597,7 @@ public class ClientMessagePacker implements AutoCloseable {
*
* @param vals Object array.
*/
- public void packObjectArrayAsBinaryTuple(Object @Nullable [] vals,
@Nullable Marshaller<Object, byte[]> marshaller) {
+ public void packObjectArrayAsBinaryTuple(Object @Nullable [] vals) {
assert !closed : "Packer is closed";
if (vals == null) {
@@ -619,7 +617,7 @@ public class ClientMessagePacker implements AutoCloseable {
var builder = new BinaryTupleBuilder(vals.length * 3);
for (Object arg : vals) {
- ClientBinaryTupleUtils.appendObject(builder, arg, marshaller);
+ ClientBinaryTupleUtils.appendObject(builder, arg);
}
packBinaryTuple(builder);
@@ -651,7 +649,7 @@ public class ClientMessagePacker implements AutoCloseable {
var builder = new BinaryTupleBuilder(rowLen * 3);
for (Object value : values) {
- ClientBinaryTupleUtils.appendObject(builder, value, null);
+ ClientBinaryTupleUtils.appendObject(builder, value);
}
packBinaryTuple(builder);
@@ -663,7 +661,7 @@ public class ClientMessagePacker implements AutoCloseable {
*
* @param val Object array.
*/
- public <T> void packObjectAsBinaryTuple(@Nullable T val, @Nullable
Marshaller<T, byte[]> marshaller) {
+ public <T> void packObjectAsBinaryTuple(@Nullable T val) {
assert !closed : "Packer is closed";
if (val == null) {
@@ -675,7 +673,7 @@ public class ClientMessagePacker implements AutoCloseable {
// Builder with inline schema.
// Value is represented by 3 tuple elements: type, scale, value.
var builder = new BinaryTupleBuilder(3, 3, false);
- ClientBinaryTupleUtils.appendObject(builder, val, marshaller);
+ ClientBinaryTupleUtils.appendObject(builder, val);
packBinaryTuple(builder);
}
@@ -718,6 +716,18 @@ public class ClientMessagePacker implements AutoCloseable {
packByteBuffer(buf);
}
+ /**
+ * Pack binary.
+ *
+ * @param buf Byte array.
+ */
+ public void packBinary(byte[] buf) {
+ assert !closed : "Packer is closed";
+
+ packBinaryHeader(buf.length);
+ writePayload(buf);
+ }
+
/**
* Pack ByteBuffer.
*
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ComputeJobType.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ComputeJobType.java
new file mode 100644
index 0000000000..4cd5847af1
--- /dev/null
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ComputeJobType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * The type of the object that can be passed/returned to/from the compute job.
In can be a native type that is represented by
+ * {@link ColumnType} or a marshalled object/tuple.
+ */
+public class ComputeJobType {
+ static final int NATIVE_ID = 0;
+ static final int MARSHALLED_TUPLE_ID = 1;
+ static final int MARSHALLED_OBJECT_ID = 2;
+
+ /**
+ * [0, .., Integer.MAX_VALUE] - native types. The id is the same as in
{@link ColumnType}. (0, .., Integer.MIN_VALUE] - marshalled
+ * types.
+ */
+ private final int id;
+ private final Type type;
+
+ ComputeJobType(int id) {
+ this.id = id;
+ this.type = Type.fromId(id);
+ }
+
+ /** Return the id of the type. */
+ public int id() {
+ return id;
+ }
+
+ /** Return the type of the object. */
+ public Type type() {
+ return type;
+ }
+
+ /** The type of the object. */
+ public enum Type {
+ MARSHALLED_TUPLE, MARSHALLED_OBJECT, NATIVE;
+
+ static Type fromId(int id) {
+ if (id == MARSHALLED_TUPLE_ID) {
+ return MARSHALLED_TUPLE;
+ }
+
+ if (id == MARSHALLED_OBJECT_ID) {
+ return MARSHALLED_OBJECT;
+ }
+
+ if (id == NATIVE_ID) {
+ return NATIVE;
+ }
+
+ throw new IllegalArgumentException("Unsupported type id: " + id);
+ }
+ }
+}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
index 3701c6ff6c..2541816c67 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
@@ -55,7 +55,7 @@ public class StreamerReceiverSerializer {
var builder = new BinaryTupleBuilder(binaryTupleSize);
builder.appendString(receiverClassName);
- ClientBinaryTupleUtils.appendObject(builder, receiverArgs,
receiverArgsMarshaller);
+ ClientBinaryTupleUtils.appendObject(builder, receiverArgs);
ClientBinaryTupleUtils.appendCollectionToBinaryTuple(builder, items);
@@ -79,7 +79,7 @@ public class StreamerReceiverSerializer {
var builder = new BinaryTupleBuilder(binaryTupleSize);
builder.appendString(receiver.receiverClassName());
- ClientBinaryTupleUtils.appendObject(builder, receiverArgs,
receiver.argumentMarshaller());
+ ClientBinaryTupleUtils.appendObject(builder, receiverArgs);
ClientBinaryTupleUtils.appendCollectionToBinaryTuple(builder, items);
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
index 5aa27342f2..46f5509ff8 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
@@ -132,7 +132,7 @@ public class JdbcBatchPreparedStmntRequest implements
ClientMessage {
packer.packInt(args.size());
for (Object[] arg : args) {
- packer.packObjectArrayAsBinaryTuple(arg, null);
+ packer.packObjectArrayAsBinaryTuple(arg);
}
packer.packLong(queryTimeoutMillis);
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
index 56159b5c94..99eb4545a7 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
@@ -188,7 +188,7 @@ public class JdbcQueryExecuteRequest implements
ClientMessage {
packer.packString(sqlQry);
packer.packBoolean(multiStatement);
- packer.packObjectArrayAsBinaryTuple(args, null);
+ packer.packObjectArrayAsBinaryTuple(args);
packer.packLong(queryTimeoutMillis);
}
diff --git
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
new file mode 100644
index 0000000000..bd64cf933e
--- /dev/null
+++
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.temporal.ChronoUnit;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class ClientComputeJobPackerUnpackerTest {
+ private static Stream<Arguments> nativeTypes() {
+ return Stream.of(
+ (byte) 4, (short) 8, 15, 16L, 23.0f, 42.0d, "TEST_STRING",
null, UUID.randomUUID(),
+ LocalTime.now(), LocalDate.now(), LocalDateTime.now(),
Instant.now(), Period.of(1, 2, 3),
+ Duration.of(1, ChronoUnit.DAYS)
+ ).map(Arguments::of);
+ }
+
+ private static Stream<Arguments> tuples() {
+ return Stream.of(
+ null,
+ Tuple.create(),
+ Tuple.create().set("key", 1),
+ Tuple.create().set("key", "value"),
+ Tuple.create().set("col1", null).set("col2", true).set("col3",
(byte) 1).set("col4", (short) 2).set("col5", 3)
+ .set("col6", 4L).set("col7", 5.0f).set("col8",
6.0).set("col9", new BigDecimal("7.11"))
+ .set("col10", LocalDate.of(2024, 1, 1)).set("col11",
LocalTime.of(12, 0))
+ .set("col12", LocalDate.of(2024, 1,
1).atTime(LocalTime.of(12, 0)))
+ .set("col13",
UUID.fromString("123e4567-e89b-12d3-a456-426614174000")).set("col14", "string")
+ .set("col15", new byte[]{1, 2, 3}).set("col16",
Period.ofDays(10)).set("col17", Duration.ofDays(10))
+ ).map(Arguments::of);
+ }
+
+ private ClientMessagePacker messagePacker;
+
+ @BeforeEach
+ void setUp() {
+ messagePacker = new
ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer());
+ }
+
+ @AfterEach
+ void tearDown() {
+ messagePacker.close();
+ }
+
+ @MethodSource({"tuples", "nativeTypes"})
+ @ParameterizedTest
+ void packUnpackNoMarshalling_jobArgument(Object arg) {
+ // When pack job argument without marshaller.
+ ClientComputeJobPacker.packJobArgument(arg, null, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job argument without marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ var res = ClientComputeJobUnpacker.unpackJobArgument(null,
messageUnpacker);
+
+ // Then.
+ assertEquals(arg, res);
+ }
+ }
+
+ private ClientMessageUnpacker messageUnpacker(byte[] data) {
+ return new ClientMessageUnpacker(Unpooled.wrappedBuffer(data, 4,
data.length - 4));
+ }
+
+ @MethodSource({"tuples", "nativeTypes"})
+ @ParameterizedTest
+ void packUnpackNoMarshalling_jobResult(Object arg) {
+ // When pack job result without marshaller.
+ ClientComputeJobPacker.packJobResult(arg, null, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job result without marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ var res = ClientComputeJobUnpacker.unpackJobResult(null,
messageUnpacker);
+
+ // Then.
+ assertEquals(arg, res);
+ }
+ }
+
+ @Test
+ void marshallingPackUnpack_jobResult() {
+ // Given.
+ Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+ var str = "Hi, marshal me!";
+
+ // When pack job result with marshaller.
+ ClientComputeJobPacker.packJobResult(str, marshaller, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job result with marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ Object res = ClientComputeJobUnpacker.unpackJobResult(marshaller,
messageUnpacker);
+
+ // Then.
+ assertEquals(str, res);
+ }
+ }
+
+ @Test
+ void marshallingPackUnpack_jobArgument() {
+ // Given.
+ Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+ var str = "Hi, marshal me!";
+
+ // When pack job argument with marshaller.
+ ClientComputeJobPacker.packJobArgument(str, marshaller, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job argument with marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ Object res =
ClientComputeJobUnpacker.unpackJobArgument(marshaller, messageUnpacker);
+
+ // Then.
+ assertEquals(str, res);
+ }
+ }
+
+ @Test
+ void packWithMarshallerUnpackWithout_jobResult() {
+ // Given.
+ Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+ var str = "Hi, marshal me!";
+
+ // When pack job result with marshaller.
+ ClientComputeJobPacker.packJobResult(str, marshaller, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job result without marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ // Then the exception is thrown because it is not allowed unpack
the marshalled object without marshaller.
+ assertThrows(
+ UnmarshallingException.class,
+ () -> ClientComputeJobUnpacker.unpackJobResult(null,
messageUnpacker)
+ );
+ }
+ }
+
+ @Test
+ void packWithMarshallerUnpackWithout_jobArgument() {
+ // Given.
+ Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+ var str = "Hi, marshal me!";
+
+ // When pack job argument with marshaller.
+ ClientComputeJobPacker.packJobArgument(str, marshaller, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job argument without marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ // Then the exception is thrown because it is not allowed unpack
the marshalled object without marshaller.
+ assertThrows(
+ UnmarshallingException.class,
+ () -> ClientComputeJobUnpacker.unpackJobArgument(null,
messageUnpacker)
+ );
+ }
+ }
+
+ @Test
+ void packByteArrayUnpackStringWithMarshaller_jobResult() {
+ // Given.
+ var str = "Hi, marshal me!";
+ var bytes = str.getBytes();
+
+ // When pack job result without marshaller.
+ ClientComputeJobPacker.packJobResult(bytes, null, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job result with marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ // Then the exception is thrown because it is not allowed to
define the marshaller only for the result.
+ assertThrows(
+ UnmarshallingException.class,
+ () -> ClientComputeJobUnpacker.unpackJobResult(new
TestStringMarshaller(), messageUnpacker)
+ );
+ }
+ }
+
+ @Test
+ void packByteArrayUnpackStringWithMarshaller_jobArgument() {
+ // Given.
+ var str = "Hi, marshal me!";
+ byte[] bytes = str.getBytes();
+
+ // When pack job argument without marshaller.
+ ClientComputeJobPacker.packJobArgument(bytes, null, messagePacker);
+ byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+ // And unpack job argument with marshaller.
+ try (var messageUnpacker = messageUnpacker(data)) {
+ // Then the exception is thrown because it is not allowed to
define the marshaller only for the result.
+ assertThrows(
+ UnmarshallingException.class,
+ () -> ClientComputeJobUnpacker.unpackJobArgument(new
TestStringMarshaller(), messageUnpacker)
+ );
+ }
+ }
+
+ private static class TestStringMarshaller implements Marshaller<String,
byte[]> {
+ @Override
+ public byte[] marshal(String obj) {
+ return obj.getBytes();
+ }
+
+ @Override
+ public String unmarshal(byte[] bytes) {
+ return new String(bytes);
+ }
+ }
+}
diff --git
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
index be9301c4a3..9a9ccd497d 100644
---
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
+++
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
@@ -162,9 +162,9 @@ public class ClientMessagePackerUnpackerTest {
@Test
public void testObjectArrayAsBinaryTuple() {
try (var packer = new
ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
- packer.packObjectArrayAsBinaryTuple(argsAllTypes, null);
- packer.packObjectArrayAsBinaryTuple(null, null);
- packer.packObjectArrayAsBinaryTuple(new Object[0], null);
+ packer.packObjectArrayAsBinaryTuple(argsAllTypes);
+ packer.packObjectArrayAsBinaryTuple(null);
+ packer.packObjectArrayAsBinaryTuple(new Object[0]);
byte[] data = ByteBufUtil.getBytes(packer.getBuffer());
@@ -184,7 +184,7 @@ public class ClientMessagePackerUnpackerTest {
public void testObjectAsBinaryTuple() {
try (var packer = new
ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
for (Object arg : argsAllTypes) {
- packer.packObjectAsBinaryTuple(arg, null);
+ packer.packObjectAsBinaryTuple(arg);
}
byte[] data = ByteBufUtil.getBytes(packer.getBuffer());
diff --git
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ComputeJobTypeTest.java
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ComputeJobTypeTest.java
new file mode 100644
index 0000000000..ed73399710
--- /dev/null
+++
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ComputeJobTypeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.stream.Stream;
+import org.apache.ignite.internal.client.proto.ComputeJobType.Type;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@TestInstance(Lifecycle.PER_CLASS)
+class ComputeJobTypeTest {
+ private static Stream<Arguments> marshalledTypes() {
+ return Stream.of(
+ Arguments.of(0, Type.NATIVE, 0),
+ Arguments.of(1, Type.MARSHALLED_TUPLE, 1),
+ Arguments.of(2, Type.MARSHALLED_OBJECT, 2)
+ );
+ }
+
+ @MethodSource({"marshalledTypes"})
+ @ParameterizedTest
+ void supportedTypes(int givenId, ComputeJobType.Type expectedType, int
expectedId) {
+ ComputeJobType computeJobType = new ComputeJobType(givenId);
+
+ assertEquals(expectedType, computeJobType.type());
+ assertEquals(expectedId, computeJobType.id());
+ }
+
+ @ParameterizedTest
+ @MethodSource("unsupportedTypesArgs")
+ void unsupportedTypes(int givenId) {
+ IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> new ComputeJobType(givenId));
+ assertEquals("Unsupported type id: " + givenId, e.getMessage());
+ }
+
+ private Stream<Arguments> unsupportedTypesArgs() {
+ return Stream.of(Integer.MIN_VALUE,
Integer.MAX_VALUE).map(Arguments::of);
+ }
+}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index a3eccdf9e9..2fe1a7a237 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -69,8 +69,6 @@ public class ClientComputeExecuteColocatedRequest {
deploymentUnits,
jobClassName,
options,
- null,
- null,
args);
var jobExecution = compute.wrapJobExecutionFuture(jobExecutionFut);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 7869f3b3f0..b7bf2d4090 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -86,7 +86,7 @@ public class ClientComputeExecuteMapReduceRequest {
execution.stateAsync().whenComplete((state, errState) ->
execution.statesAsync().whenComplete((states,
errStates) ->
notificationSender.sendNotification(w -> {
- w.packObjectAsBinaryTuple(val, null);
+ w.packObjectAsBinaryTuple(val);
packTaskState(w, state);
packJobStates(w, states);
}, firstNotNull(err, errState, errStates)))
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 7ee2a78d0c..4307b8c5a3 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -28,11 +28,16 @@ import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.NodeNotFoundException;
import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
+import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
+import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
/**
* Compute execute request.
@@ -63,7 +68,7 @@ public class ClientComputeExecuteRequest {
Object arg = unpackPayload(in);
JobExecution<Object> execution = compute.executeAsyncWithFailover(
- candidates, deploymentUnits, jobClassName, options, null,
null, arg
+ candidates, deploymentUnits, jobClassName, options, arg
);
sendResultAndState(execution, notificationSender);
@@ -104,18 +109,27 @@ public class ClientComputeExecuteRequest {
return execution.resultAsync().whenComplete((val, err) ->
execution.stateAsync().whenComplete((state, errState) ->
notificationSender.sendNotification(w -> {
- w.packObjectAsBinaryTuple(val, null);
+ Marshaller<Object, byte[]> marshaller =
extractMarshaller(execution);
+ ClientComputeJobPacker.packJobResult(val,
marshaller, w);
packJobState(w, state);
}, err)));
}
+ private static <T> @Nullable Marshaller<T, byte[]>
extractMarshaller(JobExecution<T> e) {
+ if (e instanceof MarshallerProvider) {
+ return ((MarshallerProvider<T>) e).resultMarshaller();
+ }
+
+ return null;
+ }
+
/**
* Unpacks args.
*
* @param in Unpacker.
* @return Args array.
*/
- static Object unpackPayload(ClientMessageUnpacker in) {
- return in.unpackObjectFromBinaryTuple();
+ static @Nullable Object unpackPayload(ClientMessageUnpacker in) {
+ return ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller(in);
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 36d7b49196..9396584a90 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -48,6 +48,7 @@ import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.TuplePart;
@@ -421,7 +422,7 @@ public class ClientCompute implements IgniteCompute {
w.packString(jobClassName);
w.packInt(options.priority());
w.packInt(options.maxRetries());
- w.packObjectAsBinaryTuple(args, marshaller);
+ ClientComputeJobPacker.packJobArgument(args, marshaller, w);
}
private static void packTask(ClientMessagePacker w,
@@ -431,7 +432,7 @@ public class ClientCompute implements IgniteCompute {
@Nullable Marshaller<Object, byte[]> marshaller) {
w.packDeploymentUnits(units);
w.packString(taskClassName);
- w.packObjectAsBinaryTuple(arg, marshaller);
+ ClientComputeJobPacker.packJobArgument(arg, marshaller, w);
}
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index f086c311e8..d31b52a9aa 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -29,6 +29,7 @@ import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.compute.JobStateImpl;
@@ -65,11 +66,9 @@ class ClientJobExecution<R> implements JobExecution<R> {
.thenApply(r -> {
// Notifications require explicit input close.
try (r) {
- Object o = r.in().unpackObjectFromBinaryTuple();
- R result = marshaller != null ?
marshaller.unmarshal((byte[]) o) : (R) o;
-
+ Object result =
ClientComputeJobUnpacker.unpackJobResult(marshaller, r.in());
stateFuture.complete(unpackJobState(r));
- return result;
+ return (R) result;
}
});
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index b1c1144b26..437b945c4d 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -232,7 +232,7 @@ public class ClientSql implements IgniteSql {
w.out().packString(statement.query());
- w.out().packObjectArrayAsBinaryTuple(arguments, null);
+ w.out().packObjectArrayAsBinaryTuple(arguments);
w.out().packLong(ch.observableTimestamp());
};
@@ -309,7 +309,7 @@ public class ClientSql implements IgniteSql {
packProperties(w, null);
w.out().packString(query);
- w.out().packObjectArrayAsBinaryTuple(arguments, null);
+ w.out().packObjectArrayAsBinaryTuple(arguments);
w.out().packLong(ch.observableTimestamp());
};
@@ -331,7 +331,7 @@ public class ClientSql implements IgniteSql {
if (statementProps != null) {
for (Entry<String, Object> entry : statementProps.entrySet()) {
builder.appendString(entry.getKey());
- ClientBinaryTupleUtils.appendObject(builder, entry.getValue(),
null);
+ ClientBinaryTupleUtils.appendObject(builder, entry.getValue());
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 9fc69ca097..2a323a24b8 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
import org.apache.ignite.internal.compute.JobStateImpl;
+import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.TaskStateImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -95,8 +96,6 @@ public class FakeCompute implements IgniteComputeInternal {
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
- @Nullable Marshaller<Object, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
Object args) {
if (Objects.equals(jobClassName, GET_UNITS)) {
String unitString =
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
@@ -135,8 +134,6 @@ public class FakeCompute implements IgniteComputeInternal {
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
- @Nullable Marshaller<Object, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
Object args
) {
return completedFuture(jobExecution(future != null ? future :
completedFuture((R) nodeName)));
@@ -147,7 +144,7 @@ public class FakeCompute implements IgniteComputeInternal {
if (target instanceof AnyNodeJobTarget) {
Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
return executeAsyncWithFailover(
- nodes, descriptor.units(), descriptor.jobClassName(),
descriptor.options(), null, null, args
+ nodes, descriptor.units(), descriptor.jobClassName(),
descriptor.options(), args
);
} else if (target instanceof ColocatedJobTarget) {
return jobExecution(future != null ? future : completedFuture((R)
nodeName));
@@ -200,27 +197,43 @@ public class FakeCompute implements IgniteComputeInternal
{
JobState newState =
JobStateImpl.toBuilder(state).status(status).finishTime(Instant.now()).build();
jobStates.put(jobId, newState);
});
- return new JobExecution<>() {
- @Override
- public CompletableFuture<R> resultAsync() {
- return result;
- }
+ return new FakeJobExecution<>(result, jobId);
+ }
- @Override
- public CompletableFuture<@Nullable JobState> stateAsync() {
- return completedFuture(jobStates.get(jobId));
- }
+ private class FakeJobExecution<R> implements JobExecution<R>,
MarshallerProvider<R> {
+ private final CompletableFuture<R> result;
+ private final UUID jobId;
- @Override
- public CompletableFuture<@Nullable Boolean> cancelAsync() {
- return trueCompletedFuture();
- }
+ private FakeJobExecution(CompletableFuture<R> result, UUID jobId) {
+ this.result = result;
+ this.jobId = jobId;
+ }
- @Override
- public CompletableFuture<@Nullable Boolean>
changePriorityAsync(int newPriority) {
- return trueCompletedFuture();
- }
- };
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobState> stateAsync() {
+ return completedFuture(jobStates.get(jobId));
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return trueCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
+ return trueCompletedFuture();
+ }
+
+
+ @Override
+ public Marshaller<R, byte[]> resultMarshaller() {
+ return null;
+ }
}
private <R> TaskExecution<R> taskExecution(CompletableFuture<R> result) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
index 3002b83ab0..37769e6715 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.compute;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.internal.compute.executor.JobExecutionInternal;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -28,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Result type.
*/
-class DelegatingJobExecution<R> implements JobExecution<R> {
+class DelegatingJobExecution<R> implements JobExecution<R>,
MarshallerProvider<R> {
private final CompletableFuture<JobExecutionInternal<R>> delegate;
DelegatingJobExecution(CompletableFuture<JobExecutionInternal<R>>
delegate) {
@@ -54,4 +56,13 @@ class DelegatingJobExecution<R> implements JobExecution<R> {
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
return delegate.thenApply(jobExecutionInternal ->
jobExecutionInternal.changePriority(newPriority));
}
+
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ try {
+ return
delegate.thenApply(JobExecutionInternal::resultMarshaller).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index e8628ef804..a0932444cb 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.JobState;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -92,6 +93,12 @@ public class ExecutionManager {
public CompletableFuture<?> resultAsync(UUID jobId) {
JobExecution<?> execution = executions.get(jobId);
if (execution != null) {
+ if (execution instanceof MarshallerProvider) {
+ Marshaller<Object, byte[]> marshaller = ((MarshallerProvider)
execution).resultMarshaller();
+ if (marshaller != null) {
+ return
execution.resultAsync().thenApply(marshaller::marshal);
+ }
+ }
return execution.resultAsync();
}
return failedFuture(new ComputeException(RESULT_NOT_FOUND_ERR, "Job
result not found for the job with ID: " + jobId));
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index 1048c147e3..26cc364721 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -27,6 +27,7 @@ import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -40,7 +41,7 @@ import org.jetbrains.annotations.Nullable;
*
* @param <T> the type of the job result.
*/
-class FailSafeJobExecution<T> implements JobExecution<T> {
+class FailSafeJobExecution<T> implements JobExecution<T>,
MarshallerProvider<T> {
private static final IgniteLogger LOG =
Loggers.forClass(FailSafeJobExecution.class);
/**
@@ -179,4 +180,14 @@ class FailSafeJobExecution<T> implements JobExecution<T> {
throw new IllegalStateException("Job is already completed
exceptionally.");
}
}
+
+ @Override
+ public @Nullable Marshaller<T, byte[]> resultMarshaller() {
+ JobExecution<T> exec = runningJobExecution.get();
+ if (exec instanceof MarshallerProvider) {
+ return ((MarshallerProvider<T>) exec).resultMarshaller();
+ }
+
+ return null;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 5ca872ebc7..3428396b08 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPub
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
+import static org.apache.ignite.marshalling.Marshaller.tryMarshalOrCast;
import java.util.Collection;
import java.util.HashSet;
@@ -112,8 +113,8 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
Objects.requireNonNull(target);
Objects.requireNonNull(descriptor);
- Marshaller<Object, byte[]> argumentMarshaller = (Marshaller<Object,
byte[]>) descriptor.argumentMarshaller();
- Marshaller<Object, byte[]> resultMarshaller = (Marshaller<Object,
byte[]>) descriptor.resultMarshaller();
+ Marshaller<T, byte[]> argumentMarshaller =
descriptor.argumentMarshaller();
+ Marshaller<R, byte[]> resultMarshaller = descriptor.resultMarshaller();
if (target instanceof AnyNodeJobTarget) {
Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
@@ -121,16 +122,22 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
if (nodes.size() == 1) {
ClusterNode node = nodes.iterator().next();
if (node.id().equals(topologyService.localMember().id())) {
- return (JobExecution<R>) executeAsyncWithFailover(
- nodes, descriptor.units(),
descriptor.jobClassName(), descriptor.options(),
- argumentMarshaller, resultMarshaller, args
+ return new ResultUnmarshallingJobExecution<>(
+ executeAsyncWithFailover(
+ nodes, descriptor.units(),
descriptor.jobClassName(), descriptor.options(),
+ tryMarshalOrCast(argumentMarshaller, args)
+ ),
+ resultMarshaller
);
}
}
- return (JobExecution<R>) executeAsyncWithFailover(
- nodes, descriptor.units(), descriptor.jobClassName(),
descriptor.options(),
- argumentMarshaller, resultMarshaller, args
+ return new ResultUnmarshallingJobExecution<>(
+ executeAsyncWithFailover(
+ nodes, descriptor.units(),
descriptor.jobClassName(), descriptor.options(),
+ tryMarshalOrCast(argumentMarshaller, args)
+ ),
+ resultMarshaller
);
}
@@ -144,15 +151,13 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
if (mapper != null) {
jobFut = requiredTable(tableName)
.thenCompose(table ->
primaryReplicaForPartitionByMappedKey(table, key, mapper)
- .thenApply(primaryNode -> (JobExecution<R>)
executeOnOneNodeWithFailover(
+ .thenApply(primaryNode ->
executeOnOneNodeWithFailover(
primaryNode,
new
NextColocatedWorkerSelector<>(placementDriver, topologyService, clock, table,
key, mapper),
descriptor.units(),
descriptor.jobClassName(),
descriptor.options(),
- argumentMarshaller,
- resultMarshaller,
- args
+ tryMarshalOrCast(argumentMarshaller,
args)
)));
} else {
@@ -163,13 +168,11 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
descriptor.units(),
descriptor.jobClassName(),
descriptor.options(),
- argumentMarshaller,
- resultMarshaller,
- args))
+ tryMarshalOrCast(argumentMarshaller, args)))
.thenApply(job -> (JobExecution<R>) job);
}
- return new JobExecutionFutureWrapper<>(jobFut);
+ return new ResultUnmarshallingJobExecution<>(new
JobExecutionFutureWrapper<>(jobFut), resultMarshaller);
}
throw new IllegalArgumentException("Unsupported job target: " +
target);
@@ -186,9 +189,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
- @Nullable Marshaller<Object, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
- Object args
+ @Nullable Object args
) {
Set<ClusterNode> candidates = new HashSet<>();
for (ClusterNode node : nodes) {
@@ -213,8 +214,6 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
units,
jobClassName,
options,
- argumentMarshaller,
- resultMarshaller,
args
));
}
@@ -236,24 +235,15 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions jobExecutionOptions,
- @Nullable Marshaller<T, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
@Nullable T payload
) {
ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
- Object marshalledArg = argumentMarshaller == null ? payload :
argumentMarshaller.marshal(payload);
if (isLocal(targetNode)) {
- return new ResultMarshallingJobExecution<>(
- computeComponent.executeLocally(options, units,
jobClassName, marshalledArg),
- resultMarshaller
- );
+ return computeComponent.executeLocally(options, units,
jobClassName, payload);
} else {
- return new ResultMarshallingJobExecution<>(
- computeComponent.executeRemotelyWithFailover(
- targetNode, nextWorkerSelector, units,
jobClassName, options, marshalledArg
- ),
- resultMarshaller
+ return computeComponent.executeRemotelyWithFailover(
+ targetNode, nextWorkerSelector, units, jobClassName,
options, payload
);
}
}
@@ -286,14 +276,12 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
- @Nullable Marshaller<Object, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
- Object arg) {
+ @Nullable Object arg) {
return primaryReplicaForPartitionByTupleKey(table, key)
- .thenApply(primaryNode -> (JobExecution<R>)
executeOnOneNodeWithFailover(
+ .thenApply(primaryNode -> executeOnOneNodeWithFailover(
primaryNode,
new NextColocatedWorkerSelector<>(placementDriver,
topologyService, clock, table, key),
- units, jobClassName, options, argumentMarshaller,
(Marshaller<Object, byte[]>) resultMarshaller, arg
+ units, jobClassName, options, arg
));
}
@@ -344,6 +332,8 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
Objects.requireNonNull(descriptor);
Marshaller<T, byte[]> argumentMarshaller =
descriptor.argumentMarshaller();
+ Marshaller<R, byte[]> resultMarshaller = descriptor.resultMarshaller();
+
return nodes.stream()
.collect(toUnmodifiableMap(identity(),
// No failover nodes for broadcast. We use failover
here in order to complete futures with exceptions
@@ -352,10 +342,13 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
if (topologyService.getByConsistentId(node.name())
== null) {
return new FailedExecution<>(new
NodeNotFoundException(Set.of(node.name())));
}
- return new JobExecutionWrapper<>((JobExecution<R>)
executeOnOneNodeWithFailover(
- node,
CompletableFutures::nullCompletedFuture,
- descriptor.units(),
descriptor.jobClassName(), descriptor.options(),
- argumentMarshaller, (Marshaller<Object,
byte[]>) descriptor.resultMarshaller(), args));
+ return new ResultUnmarshallingJobExecution<>(
+ new JobExecutionWrapper<>(
+ executeOnOneNodeWithFailover(
+ node,
CompletableFutures::nullCompletedFuture,
+ descriptor.units(),
descriptor.jobClassName(),
+ descriptor.options(),
tryMarshalOrCast(argumentMarshaller, args))),
+ resultMarshaller);
}));
}
@@ -417,7 +410,6 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
deploymentUnits,
StreamerReceiverJob.class.getName(),
JobExecutionOptions.DEFAULT,
- null, null,
payload);
return jobExecution.resultAsync()
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index cdc2e12b4f..352b4563d4 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -29,7 +29,6 @@ import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.Nullable;
@@ -47,8 +46,6 @@ public interface IgniteComputeInternal extends IgniteCompute {
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param options Job execution options.
- * @param argumentMarshaller Marshaller for the job argument.
- * @param resultMarshaller Marshaller for the job result.
* @param payload Arguments of the job.
* @return CompletableFuture Job result.
*/
@@ -57,9 +54,7 @@ public interface IgniteComputeInternal extends IgniteCompute {
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
- @Nullable Marshaller<Object, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
- Object payload
+ @Nullable Object payload
);
/**
@@ -71,8 +66,6 @@ public interface IgniteComputeInternal extends IgniteCompute {
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param options job execution options (priority, max retries).
- * @param argumentMarshaller Marshaller for the job argument.
- * @param resultMarshaller Marshaller for the job result.
* @param payload Arguments of the job.
* @param <R> Job result type.
* @return Job execution object.
@@ -83,8 +76,6 @@ public interface IgniteComputeInternal extends IgniteCompute {
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
- @Nullable Marshaller<Object, byte[]> argumentMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller,
Object payload);
/**
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
index 4fabbbd62c..5e19c48687 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertT
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Result type.
*/
-public class JobExecutionWrapper<R> implements JobExecution<R> {
+public class JobExecutionWrapper<R> implements JobExecution<R>,
MarshallerProvider<R> {
private final JobExecution<R> delegate;
JobExecutionWrapper(JobExecution<R> delegate) {
@@ -56,4 +57,13 @@ public class JobExecutionWrapper<R> implements
JobExecution<R> {
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
return
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
}
+
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ if (delegate instanceof MarshallerProvider) {
+ return ((MarshallerProvider<R>) delegate).resultMarshaller();
+ }
+
+ return null;
+ }
}
diff --git a/modules/binary-tuple/build.gradle
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/MarshallerProvider.java
similarity index 65%
copy from modules/binary-tuple/build.gradle
copy to
modules/compute/src/main/java/org/apache/ignite/internal/compute/MarshallerProvider.java
index 8200416793..c8570dd1bb 100644
--- a/modules/binary-tuple/build.gradle
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/MarshallerProvider.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.compute;
-description = 'ignite-binary-tuple'
+import org.apache.ignite.marshalling.Marshaller;
+import org.jetbrains.annotations.Nullable;
-dependencies {
- annotationProcessor project(':ignite-configuration-annotation-processor')
- implementation project(':ignite-api')
- implementation project(':ignite-core')
- implementation libs.jetbrains.annotations
+/**
+ * Marshaller provider.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface MarshallerProvider<R> {
+ /** Returns marshaller or null. */
+ @Nullable Marshaller<R, byte[]> resultMarshaller();
}
-
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
index 134353f4a3..9c3caa6457 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
@@ -29,25 +29,18 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Result type.
*/
-class ResultMarshallingJobExecution<R> implements JobExecution<R> {
+class ResultMarshallingJobExecution<R> implements JobExecution<R>,
MarshallerProvider<R> {
private final JobExecution<R> delegate;
private final Marshaller<R, byte[]> resultMarshaller;
- ResultMarshallingJobExecution(JobExecution<R> delegate, Marshaller<R,
byte[]> resultMarshaller) {
+ ResultMarshallingJobExecution(JobExecution<R> delegate, @Nullable
Marshaller<R, byte[]> resultMarshaller) {
this.delegate = delegate;
this.resultMarshaller = resultMarshaller;
}
@Override
public CompletableFuture<R> resultAsync() {
- return delegate.resultAsync()
- .thenApply(res -> {
- if (resultMarshaller == null) {
- return res;
- }
-
- return resultMarshaller.unmarshal((byte[]) res);
- });
+ return delegate.resultAsync();
}
@Override
@@ -64,4 +57,9 @@ class ResultMarshallingJobExecution<R> implements
JobExecution<R> {
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
return delegate.changePriorityAsync(newPriority);
}
+
+ @Override
+ public Marshaller<R, byte[]> resultMarshaller() {
+ return resultMarshaller;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
similarity index 70%
copy from
modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
copy to
modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
index 134353f4a3..b6145258a0 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
@@ -17,37 +17,31 @@
package org.apache.ignite.internal.compute;
+import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;
+
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
-import org.apache.ignite.internal.compute.executor.JobExecutionInternal;
import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
- * Delegates {@link JobExecution} to the future of {@link
JobExecutionInternal}.
+ * Wraps {@link #resultAsync()} with marshalling.
*
* @param <R> Result type.
*/
-class ResultMarshallingJobExecution<R> implements JobExecution<R> {
+class ResultUnmarshallingJobExecution<R> implements JobExecution<R> {
private final JobExecution<R> delegate;
- private final Marshaller<R, byte[]> resultMarshaller;
+ private final Marshaller<R, byte[]> resultUnmarshaller;
- ResultMarshallingJobExecution(JobExecution<R> delegate, Marshaller<R,
byte[]> resultMarshaller) {
+ ResultUnmarshallingJobExecution(JobExecution<R> delegate, @Nullable
Marshaller<R, byte[]> resultUnmarshaller) {
this.delegate = delegate;
- this.resultMarshaller = resultMarshaller;
+ this.resultUnmarshaller = resultUnmarshaller;
}
@Override
public CompletableFuture<R> resultAsync() {
- return delegate.resultAsync()
- .thenApply(res -> {
- if (resultMarshaller == null) {
- return res;
- }
-
- return resultMarshaller.unmarshal((byte[]) res);
- });
+ return delegate.resultAsync().thenApply(r ->
tryUnmarshalOrCast(resultUnmarshaller, r));
}
@Override
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 1d7ba15ca8..a6d2216ad2 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -94,37 +94,21 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
Marshaller<R, byte[]> resultMarshaller =
jobInstance.resultMarshaller();
QueueExecution<R> execution = executorService.submit(
- unmarshalExecMarshal(input, jobInstance, context,
inputMarshaller, resultMarshaller),
+ unmarshalExecMarshal(input, jobInstance, context,
inputMarshaller),
options.priority(),
options.maxRetries()
);
- return new JobExecutionInternal<>(execution, isInterrupted);
+ return new JobExecutionInternal<>(execution, isInterrupted,
resultMarshaller);
}
private static <T, R> Callable<CompletableFuture<R>> unmarshalExecMarshal(
T input,
ComputeJob<T, R> jobInstance,
JobExecutionContext context,
- @Nullable Marshaller<T, byte[]> inputMarshaller,
- @Nullable Marshaller<R, byte[]> resultMarshaller
+ @Nullable Marshaller<T, byte[]> inputMarshaller
) {
- return () -> {
- var fut = jobInstance.executeAsync(context,
unmarshallOrNotIfNull(inputMarshaller, input));
- if (fut != null) {
- return (CompletableFuture<R>) fut.thenApply(res ->
marshallOrNull(res, resultMarshaller));
- }
- return null;
- };
- }
-
-
- private static <R> Object marshallOrNull(Object res, @Nullable
Marshaller<R, byte[]> marshaller) {
- if (marshaller == null) {
- return res;
- }
-
- return marshaller.marshal((R) res);
+ return () -> jobInstance.executeAsync(context,
unmarshallOrNotIfNull(inputMarshaller, input));
}
private static <T> @Nullable T unmarshallOrNotIfNull(@Nullable
Marshaller<T, byte[]> marshaller, Object input) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index 219171b372..6b8313dbb1 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.compute.executor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.queue.QueueExecution;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -28,20 +30,23 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Job result type.
*/
-public class JobExecutionInternal<R> {
+public class JobExecutionInternal<R> implements MarshallerProvider<R> {
private final QueueExecution<R> execution;
private final AtomicBoolean isInterrupted;
+ private final Marshaller<R, byte[]> marshaller;
+
/**
* Constructor.
*
* @param execution Internal execution state.
* @param isInterrupted Flag which is passed to the execution context so
that the job can check it for cancellation request.
*/
- JobExecutionInternal(QueueExecution<R> execution, AtomicBoolean
isInterrupted) {
+ JobExecutionInternal(QueueExecution<R> execution, AtomicBoolean
isInterrupted, @Nullable Marshaller<R, byte[]> marshaller) {
this.execution = execution;
this.isInterrupted = isInterrupted;
+ this.marshaller = marshaller;
}
public CompletableFuture<R> resultAsync() {
@@ -72,4 +77,9 @@ public class JobExecutionInternal<R> {
public boolean changePriority(int newPriority) {
return execution.changePriority(newPriority);
}
+
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ return marshaller;
+ }
}
diff --git
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index 1de9bc3e4e..2cab70d30c 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -24,24 +24,72 @@
namespace ignite::detail {
+/**
+ * Compute Job Type.
+ */
+enum class compute_job_type {
+ /** Native. */
+ NATIVE = 0,
+
+ /** Marshalled Tuple. */
+ MARSHALLED_TUPLE = 1,
+
+ /** Marshalled Object. */
+ MARSHALLED_OBJECT = 2,
+};
+
+
+/**
+ * Get Compute Job Type from Type ID.
+ * @param type_id Type ID.
+ * @return Compute Job Type.
+ */
+compute_job_type from_type_id(ignite_type type_id) {
+ auto type_conv = compute_job_type(type_id);
+
+ if (type_conv == compute_job_type::MARSHALLED_TUPLE || type_conv ==
compute_job_type::MARSHALLED_OBJECT)
+ return type_conv;
+
+ return compute_job_type::NATIVE;
+}
+
+
/**
* Write a collection of primitives as a binary tuple.
*
* @param writer Writer to use.
* @param arg Argument.
*/
-void write_object_as_binary_tuple(protocol::writer &writer, const
binary_object &arg) {
+void write_object_as_binary_tuple(protocol::writer &writer, const primitive
&arg) {
binary_tuple_builder args_builder{3};
args_builder.start();
- protocol::claim_primitive_with_type(args_builder, arg.get_primitive());
+ protocol::claim_primitive_with_type(args_builder, arg);
args_builder.layout();
- protocol::append_primitive_with_type(args_builder, arg.get_primitive());
+ protocol::append_primitive_with_type(args_builder, arg);
auto args_data = args_builder.build();
writer.write_binary(args_data);
}
+/**
+ * Pack compute argument.
+ *
+ * @param writer Writer.
+ * @param arg Argument.
+ */
+void pack_compute_argument(protocol::writer &writer, const binary_object &arg)
{
+ auto prim = arg.get_primitive();
+ if (prim.is_null()) {
+ writer.write(std::int32_t(ignite_type::NIL));
+ writer.write_nil();
+ return;
+ }
+
+ writer.write(std::int32_t(compute_job_type::NATIVE));
+ write_object_as_binary_tuple(writer, prim);
+}
+
/**
* Read primitive from a stream, which is encoded as a binary tuple.
*
@@ -58,14 +106,17 @@ primitive
read_primitive_from_binary_tuple(protocol::reader &reader) {
}
/**
- * Read primitive from a stream, which is encoded as a binary tuple.
+ * Unpack compute execution result.
*
* @param reader Reader.
* @return Value.
*/
-std::optional<primitive>
read_primitive_from_binary_tuple_nullable(protocol::reader &reader) {
- if (reader.try_read_nil())
- return std::nullopt;
+primitive unpack_compute_result(protocol::reader &reader) {
+ auto type_id = ignite_type(reader.read_int32());
+ auto job_id = from_type_id(type_id);
+
+ if (job_id != compute_job_type::NATIVE)
+ throw ignite_error("Only native compute results are supported
currently");
return read_primitive_from_binary_tuple(reader);
}
@@ -190,7 +241,7 @@ public:
job_state state;
auto read_res = result_of_operation<void>([&]() {
- res = read_primitive_from_binary_tuple_nullable(reader);
+ res = unpack_compute_result(reader);
state = read_job_state(reader);
});
@@ -240,7 +291,7 @@ void compute_impl::submit_to_nodes(const
std::vector<cluster_node> &nodes, std::
writer.write(descriptor->get_execution_options().get_priority());
writer.write(descriptor->get_execution_options().get_max_retries());
- write_object_as_binary_tuple(writer, arg);
+ pack_compute_argument(writer, arg);
};
auto handler =
std::make_shared<response_handler_compute>(shared_from_this(),
std::move(callback), false);
@@ -278,7 +329,7 @@ void compute_impl::submit_colocated_async(const std::string
&table_name, const i
writer.write(descriptor->get_execution_options().get_priority());
writer.write(descriptor->get_execution_options().get_max_retries());
- write_object_as_binary_tuple(writer, arg);
+ pack_compute_argument(writer, arg);
};
auto handler =
std::make_shared<response_handler_compute>(self, std::move(callback), true);
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 26786ecf72..0f1706c64a 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -225,7 +225,8 @@ enum class code : underlying_t {
// Marshalling group. Group code: 22
COMMON = 0x160001,
- UNSUPPORTED_OBJECT_TYPE = 0x160002
+ UNSUPPORTED_OBJECT_TYPE = 0x160002,
+ UNMARSHALLING = 0x160003
};
} // namespace error
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index f88e6bdd6a..4d767591d4 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -316,6 +316,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::COMMON:
case error::code::UNSUPPORTED_OBJECT_TYPE:
case error::code::MARSHALLING_TYPE_MISMATCH:
+ case error::code::UNMARSHALLING:
return sql_state::SHY000_GENERAL_ERROR;
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index fccce89d2a..bcf3209dbb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Tests
using Ignite.Sql;
using Internal.Buffers;
using Internal.Common;
+ using Internal.Compute;
using Internal.Network;
using Internal.Proto;
using Internal.Proto.BinaryTuple;
@@ -771,6 +772,7 @@ namespace Apache.Ignite.Tests
var arrayBufferWriter = new PooledArrayBuffer();
var writer = new MsgPackWriter(arrayBufferWriter);
+ writer.Write(ComputePacker.Native); // ComputePacker.Native
writer.Write(builder.Build().Span);
// Status
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 561b740d93..f5e10723d9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -639,6 +639,9 @@ namespace Apache.Ignite
/// <summary> UnsupportedObjectType error. </summary>
public const int UnsupportedObjectType = (GroupCode << 16) | (2 &
0xFFFF);
+
+ /// <summary> Unmarshalling error. </summary>
+ public const int Unmarshalling = (GroupCode << 16) | (3 & 0xFFFF);
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 71335106fd..ed988ff82e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -126,7 +126,7 @@ namespace Apache.Ignite.Internal.Compute
WriteUnits(taskDescriptor.DeploymentUnits, writer);
w.Write(taskDescriptor.TaskClassName);
- w.WriteObjectAsBinaryTuple(arg);
+ ComputePacker.PackArg(ref w, arg, null);
}
}
@@ -325,7 +325,7 @@ namespace Apache.Ignite.Internal.Compute
(T, JobState) Read(MsgPackReader reader)
{
- var res = (T)reader.ReadObjectFromBinaryTuple(marshaller)!;
+ var res = ComputePacker.UnpackResult(ref reader, marshaller);
var status = ReadJobState(reader);
return (res, status);
@@ -392,7 +392,7 @@ namespace Apache.Ignite.Internal.Compute
w.Write(options.Priority);
w.Write(options.MaxRetries);
- w.WriteObjectAsBinaryTuple(arg, jobDescriptor.ArgMarshaller);
+ ComputePacker.PackArg(ref w, arg, jobDescriptor.ArgMarshaller);
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
new file mode 100644
index 0000000000..9fe5c5d36a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Compute;
+
+using System;
+using System.Buffers;
+using Ignite.Table;
+using Marshalling;
+using Proto.MsgPack;
+
+/// <summary>
+/// Compute packer utils.
+/// </summary>
+internal static class ComputePacker
+{
+ /// <summary>
+ /// Natively supported simple type.
+ /// </summary>
+ internal const int Native = 0;
+
+ /// <summary>
+ /// Ignite tuple.
+ /// </summary>
+ private const int Tuple = 1;
+
+ /// <summary>
+ /// User-defined marshaller.
+ /// </summary>
+ private const int MarshallerObject = 2;
+
+ /// <summary>
+ /// Packs compute job arg.
+ /// </summary>
+ /// <param name="w">Packer.</param>
+ /// <param name="obj">Arg.</param>
+ /// <param name="marshaller">Marshaller.</param>
+ /// <typeparam name="T">Arg type.</typeparam>
+ internal static void PackArg<T>(ref MsgPackWriter w, T obj,
IMarshaller<T>? marshaller)
+ {
+ if (obj == null)
+ {
+ w.WriteNil();
+ return;
+ }
+
+ if (marshaller != null)
+ {
+ w.Write(MarshallerObject);
+ w.Write(
+ static (IBufferWriter<byte> writer, (T Obj, IMarshaller<T>
Marshaller) arg) => arg.Marshaller.Marshal(arg.Obj, writer),
+ arg: (obj, marshaller));
+
+ return;
+ }
+
+ if (obj is IIgniteTuple)
+ {
+ // TODO: IGNITE-23033 .NET: Thin 3.0: Support tuples with schemas
in Compute
+ w.Write(Tuple);
+ throw new NotImplementedException("IGNITE-23033");
+ }
+
+ w.Write(Native);
+ w.WriteObjectAsBinaryTuple(obj);
+ }
+
+ /// <summary>
+ /// Unpacks compute job result.
+ /// </summary>
+ /// <param name="r">Reader.</param>
+ /// <param name="marshaller">Optional marshaller.</param>
+ /// <typeparam name="T">Result type.</typeparam>
+ /// <returns>Result.</returns>
+ internal static T UnpackResult<T>(ref MsgPackReader r, IMarshaller<T>?
marshaller)
+ {
+ if (r.TryReadNil())
+ {
+ return (T)(object)null!;
+ }
+
+ int type = r.ReadInt32();
+
+ return type switch
+ {
+ Tuple => throw new NotImplementedException("IGNITE-23033"),
+ MarshallerObject => Unmarshal(ref r, marshaller),
+ _ => (T)r.ReadObjectFromBinaryTuple()!
+ };
+
+ static T Unmarshal(ref MsgPackReader r, IMarshaller<T>? marshaller)
+ {
+ if (marshaller == null)
+ {
+ throw new ArgumentNullException(nameof(marshaller), "Compute
job result marshaller is required but not provided.");
+ }
+
+ if (r.TryReadNil())
+ {
+ return (T)(object)null!;
+ }
+
+ var bytes = r.ReadBinary();
+
+ return marshaller.Unmarshal(bytes);
+ }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
index 6d0ae3d461..b987248d1b 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
@@ -366,40 +366,6 @@ internal ref struct MsgPackReader
}
}
- /// <summary>
- /// Reads <see cref="ColumnType"/> and value with optional marshaller.
- /// </summary>
- /// <param name="marshaller">Optional marshaller.</param>
- /// <returns>Value.</returns>
- /// <typeparam name="T">Type of the value.</typeparam>
- public T ReadObjectFromBinaryTuple<T>(IMarshaller<T>? marshaller)
- {
- if (TryReadNil())
- {
- return (T)(object)null!;
- }
-
- if (marshaller == null)
- {
- return (T)ReadObjectFromBinaryTuple()!;
- }
-
- var tuple = new BinaryTupleReader(ReadBinary(), 3);
- var type = (ColumnType)tuple.GetInt(0);
-
- if (type != ColumnType.ByteArray)
- {
- throw new UnsupportedObjectTypeMarshallingException(
- Guid.NewGuid(),
- ErrorGroups.Marshalling.UnsupportedObjectType,
- "Unsupported object type. Expected byte[], got " + type);
- }
-
- var bytes = tuple.GetBytesSpan(2);
-
- return marshaller.Unmarshal(bytes);
- }
-
/// <summary>
/// Reads <see cref="ColumnType"/> and value.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 9e92058337..36c1ff5a8f 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -343,6 +343,26 @@ internal readonly ref struct MsgPackWriter
span.CopyTo(Buf.GetSpanAndAdvance(span.Length));
}
+ /// <summary>
+ /// Appends bytes using <see cref="IBufferWriter{T}"/> directly to the
underlying buffer, avoiding extra copying.
+ /// </summary>
+ /// <param name="action">Appender action.</param>
+ /// <param name="arg">Argument.</param>
+ /// <typeparam name="TArg">Argument type.</typeparam>
+ public void Write<TArg>(Action<IBufferWriter<byte>, TArg> action, TArg arg)
+ {
+ // Reserve space for size.
+ var sizeSpan = Buf.GetSpanAndAdvance(5);
+ sizeSpan[0] = MsgPackCode.Bin32;
+
+ var startPos = Buf.Position;
+ action(Buf, arg);
+ var length = Buf.Position - startPos;
+
+ // Write size to the reserved space.
+ BinaryPrimitives.WriteUInt32BigEndian(sizeSpan[1..], (uint)length);
+ }
+
/// <summary>
/// Writes a transaction.
/// </summary>
@@ -359,36 +379,6 @@ internal readonly ref struct MsgPackWriter
}
}
- /// <summary>
- /// Writes an object with type code.
- /// </summary>
- /// <param name="obj">Object.</param>
- /// <param name="marshaller">Marshaller.</param>
- /// <typeparam name="T">Object type.</typeparam>
- public void WriteObjectAsBinaryTuple<T>(T obj, IMarshaller<T>? marshaller)
- {
- if (obj == null)
- {
- WriteNil();
- return;
- }
-
- if (marshaller == null)
- {
- WriteObjectAsBinaryTuple(obj);
- return;
- }
-
- using var builder = new BinaryTupleBuilder(3);
- builder.AppendInt((int)ColumnType.ByteArray);
- builder.AppendInt(0); // Scale.
- builder.AppendBytes(
- static (IBufferWriter<byte> writer, (T Obj, IMarshaller<T>
Marshaller) arg) => arg.Marshaller.Marshal(arg.Obj, writer),
- arg: (obj, marshaller));
-
- Write(builder.Build().Span);
- }
-
/// <summary>
/// Writes an object with type code.
/// </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
index a5835951dc..17a13d56d7 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
@@ -24,11 +24,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ignite.catalog.ColumnType;
import org.apache.ignite.catalog.definitions.TableDefinition;
import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.internal.runner.app.Jobs.ArgMarshallingJob;
@@ -172,7 +175,7 @@ public class ItThinClientComputeMarshallingTest extends
ItAbstractThinClientTest
}
@Test
- void broadcast() {
+ void executeBroadcast() {
// When.
Map<ClusterNode, String> result = client().compute().executeBroadcast(
Set.of(node(0), node(1)),
@@ -192,6 +195,39 @@ public class ItThinClientComputeMarshallingTest extends
ItAbstractThinClientTest
assertEquals(resultExpected, result);
}
+
+
+ @Test
+ void submitBroadcast() {
+ // When.
+ Map<ClusterNode, String> result = client().compute().submitBroadcast(
+ Set.of(node(0), node(1)),
+ JobDescriptor.builder(ArgumentAndResultMarshallingJob.class)
+ .argumentMarshaller(new ArgumentStringMarshaller())
+ .resultMarshaller(new ResultStringUnMarshaller())
+ .build(),
+ "Input"
+ ).entrySet().stream().collect(
+ Collectors.toMap(Entry::getKey,
ItThinClientComputeMarshallingTest::extractResult, (v, i) -> v)
+ );
+
+ // Then.
+ Map<ClusterNode, String> resultExpected = Map.of(
+ node(0),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient",
+ node(1),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient"
+ );
+
+ assertEquals(resultExpected, result);
+ }
+
+ private static String extractResult(Entry<ClusterNode,
JobExecution<String>> e) {
+ try {
+ return e.getValue().resultAsync().get();
+ } catch (InterruptedException | ExecutionException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
@Test
void colocated() {
// Given entry node.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
index 017063c709..a12003dc67 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
@@ -46,6 +46,7 @@ import
org.apache.ignite.internal.runner.app.Jobs.ResultMarshallingJob;
import org.apache.ignite.lang.ErrorGroups.Compute;
import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.marshalling.UnmarshallingException;
import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -81,7 +82,7 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
);
await().untilAsserted(() -> assertStatusCompleted(result));
- assertThrows(ClassCastException.class, () -> {
+ assertThrows(UnmarshallingException.class, () -> {
String str = getSafe(result.resultAsync());
});
}
@@ -187,8 +188,8 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
return fut.get(waitSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
var cause = e.getCause();
- if (cause instanceof ClassCastException) {
- throw (ClassCastException) cause;
+ if (cause instanceof UnmarshallingException) {
+ throw (UnmarshallingException) cause;
}
throw new RuntimeException(e);
} catch (InterruptedException | TimeoutException e) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTupleComputeMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTupleComputeMarshallingTest.java
new file mode 100644
index 0000000000..d127c35a93
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTupleComputeMarshallingTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.catalog.definitions.ColumnDefinition.column;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.catalog.ColumnType;
+import org.apache.ignite.catalog.definitions.TableDefinition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test the OOTB support for Tuples in compute api.
+ */
+@SuppressWarnings("resource")
+public class ItThinClientTupleComputeMarshallingTest extends
ItAbstractThinClientTest {
+ static final String TABLE_NAME = "test";
+ IgniteClient client;
+
+ @BeforeEach
+ void setUp() {
+ client = client();
+
+ client.catalog().createTable(
+ TableDefinition.builder(TABLE_NAME)
+ .columns(
+ column("key_col", ColumnType.INT32),
+ column("value_col", ColumnType.VARCHAR)
+ ).primaryKey("key_col")
+ .build()
+ );
+
+ client.tables().table(TABLE_NAME).keyValueView().put(
+ null,
+ Tuple.create().set("key_col", 2),
+ Tuple.create().set("value_col", "hi")
+ );
+ }
+
+ @AfterEach
+ void tearDown() {
+ client.catalog().dropTable(TABLE_NAME);
+ }
+
+ @Test
+ void tupleFromTableApiAsArgument() {
+ // Given tuple from the table.
+ var tup = client.tables().table(TABLE_NAME).keyValueView().get(null,
Tuple.create().set("key_col", 2));
+
+ // When submit job with the tuple as an argument.
+ JobExecution<String> resultJobExec = client.compute().submit(
+ JobTarget.node(node(1)),
+ JobDescriptor.builder(TupleArgJob.class).build(),
+ tup
+ );
+
+ // Then job completes successfully.
+ assertStatusCompleted(resultJobExec);
+ assertThat(
+ getSafe(resultJobExec.resultAsync()),
+ equalTo("hi")
+ );
+ }
+
+ @Test
+ void tupleFromTableReturned() {
+ // Given.
+ var key = 2;
+
+ // When submit job that returns tuple from the table.
+ JobExecution<Tuple> resultJobExec = client.compute().submit(
+ JobTarget.node(node(1)),
+ JobDescriptor.builder(TupleResultJob.class).build(),
+ key
+ );
+
+ // Then tuple is returned.
+ assertStatusCompleted(resultJobExec);
+ assertThat(
+ getSafe(resultJobExec.resultAsync()).stringValue("value_col"),
+ equalTo("hi")
+ );
+ }
+
+
+ static class TupleResultJob implements ComputeJob<Integer, Tuple> {
+ @Override
+ public @Nullable CompletableFuture<Tuple>
executeAsync(JobExecutionContext context, @Nullable Integer key) {
+ // todo: There is no table for some reason in context.ignite().
+ return completedFuture(Tuple.create().set("value_col", "hi"));
+ }
+ }
+
+ static class TupleArgJob implements ComputeJob<Tuple, String> {
+ @Override
+ public @Nullable CompletableFuture<String>
executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
+ if (arg == null) {
+ return completedFuture("null");
+ }
+
+ return completedFuture(arg.stringValue("value_col"));
+ }
+ }
+
+ private static void assertResultFailsWithErr(int errCode, JobExecution<?>
result) {
+ var ex = assertThrows(CompletionException.class, () ->
result.resultAsync().join());
+ assertThat(ex.getCause(), instanceOf(ComputeException.class));
+ assertThat(((ComputeException) ex.getCause()).code(),
equalTo(errCode));
+ }
+
+ private static void assertStatusFailed(JobExecution<?> result) {
+ var state = getSafe(result.stateAsync());
+ assertThat(state, is(notNullValue()));
+ assertThat(state.status(), equalTo(JobStatus.FAILED));
+ }
+
+ private static void assertStatusCompleted(JobExecution<?> result) {
+ var state = getSafe(result.stateAsync());
+ assertThat(state, is(notNullValue()));
+ assertThat(state.status(), equalTo(JobStatus.COMPLETED));
+ }
+
+ private static <T> T getSafe(@Nullable CompletableFuture<T> fut) {
+ assertThat(fut, is(notNullValue()));
+
+ try {
+ int waitSec = 5;
+ return fut.get(waitSec, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ var cause = e.getCause();
+ if (cause instanceof ClassCastException) {
+ throw (ClassCastException) cause;
+ }
+ throw new RuntimeException(e);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ClusterNode node(int idx) {
+ return sortedNodes().get(idx);
+ }
+
+ private List<ClusterNode> sortedNodes() {
+ return client().clusterNodes().stream()
+ .sorted(Comparator.comparing(ClusterNode::name))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
index e344b162a1..2eef341daf 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
@@ -23,11 +23,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ignite.catalog.ColumnType;
import org.apache.ignite.catalog.definitions.TableDefinition;
import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.internal.runner.app.Jobs.ArgMarshallingJob;
import
org.apache.ignite.internal.runner.app.Jobs.ArgumentAndResultMarshallingJob;
@@ -48,7 +51,7 @@ import org.junit.jupiter.api.Test;
@SuppressWarnings("resource")
public class ItEmbeddedMarshallingTest extends ItAbstractThinClientTest {
@Test
- void embeddedOk() {
+ void embeddedExecOnAnotherNode() {
// Given entry node that are not supposed to execute job.
var node = server(0);
// And another target node.
@@ -70,7 +73,29 @@ public class ItEmbeddedMarshallingTest extends
ItAbstractThinClientTest {
}
@Test
- void broadcast() {
+ void embeddedExecOnSame() {
+ // Given entry node.
+ var node = server(0);
+ // And target node.
+ var targetNode = node(0);
+
+ // When run job with custom marshaller for pojo argument and result
but for embedded.
+ var embeddedCompute = node.compute();
+ PojoResult result = embeddedCompute.execute(
+ JobTarget.node(targetNode),
+ JobDescriptor.builder(PojoJob.class)
+ .argumentMarshaller(new
JsonMarshaller<>(PojoArg.class))
+ .resultMarshaller(new
JsonMarshaller<>(PojoResult.class))
+ .build(),
+ new PojoArg().setIntValue(2).setStrValue("1")
+ );
+
+ // Then the job returns the expected result.
+ assertEquals(3L, result.getLongValue());
+ }
+
+ @Test
+ void broadcastExecute() {
// Given entry node.
var node = server(0);
@@ -86,13 +111,49 @@ public class ItEmbeddedMarshallingTest extends
ItAbstractThinClientTest {
// Then.
Map<ClusterNode, String> resultExpected = Map.of(
- node(0),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient",
+ // todo: "https://issues.apache.org/jira/browse/IGNITE-23024"
+ node(0),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer",
+ node(1),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient"
+ );
+
+ assertEquals(resultExpected, result);
+ }
+
+ @Test
+ void broadcastSubmit() {
+ // Given entry node.
+ var node = server(0);
+
+ // When.
+ Map<ClusterNode, String> result = node.compute().submitBroadcast(
+ Set.of(node(0), node(1)),
+ JobDescriptor.builder(ArgumentAndResultMarshallingJob.class)
+ .argumentMarshaller(new ArgumentStringMarshaller())
+ .resultMarshaller(new ResultStringUnMarshaller())
+ .build(),
+ "Input"
+ ).entrySet().stream().collect(
+ Collectors.toMap(Entry::getKey,
ItEmbeddedMarshallingTest::extractResult, (v, i) -> v)
+ );
+
+ // Then.
+ Map<ClusterNode, String> resultExpected = Map.of(
+ // todo: "https://issues.apache.org/jira/browse/IGNITE-23024"
+ node(0),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer",
node(1),
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient"
);
assertEquals(resultExpected, result);
}
+ private static String extractResult(Entry<ClusterNode,
JobExecution<String>> e) {
+ try {
+ return e.getValue().resultAsync().get();
+ } catch (InterruptedException | ExecutionException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
@Test
void colocated() {
// Given entry node.