This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 39478a01021 IGNITE-25343 Fix marshaller support in Data Streamer API (#5973) 39478a01021 is described below commit 39478a010216f32c0c3cde5a935cc00a19e228b5 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Jun 5 08:04:10 2025 +0300 IGNITE-25343 Fix marshaller support in Data Streamer API (#5973) * Add marshallers to `DataStreamerReceiver`: `payloadMarshaller`, `argumentMarshaller`, `resultMarshaller` * Add `DataStreamerReceiverDescriptor<T, A, R>`: * Same generic type parameters as `DataStreamerReceiver<T, A, R>` * Has `payloadMarshaller`, `argumentMarshaller`, `resultMarshaller` * Deprecate `ReceiverDescriptor<A>` * Add new `DataStreamerTarget#streamData` with `DataStreamerReceiverDescriptor` and deprecate the old one --- .../apache/ignite/table/DataStreamerReceiver.java | 32 +++- ...or.java => DataStreamerReceiverDescriptor.java} | 112 ++++++++++-- .../apache/ignite/table/DataStreamerTarget.java | 53 +++++- .../apache/ignite/table/ReceiverDescriptor.java | 3 + .../table/DataStreamerReceiverDescriptorTest.java | 70 ++++++++ .../client/proto/StreamerReceiverSerializer.java | 155 ++++++++++++---- .../internal/client/table/ClientDataStreamer.java | 12 +- .../client/table/ClientKeyValueBinaryView.java | 11 +- .../internal/client/table/ClientKeyValueView.java | 11 +- .../client/table/ClientRecordBinaryView.java | 11 +- .../internal/client/table/ClientRecordView.java | 10 +- .../client/table/api/PublicApiClientViewBase.java | 16 +- .../ignite/client/fakes/FakeInternalTable.java | 10 +- .../ignite/internal/compute/IgniteComputeImpl.java | 9 +- .../compute/streamer/StreamerReceiverJob.java | 16 +- .../streamer/ItAbstractDataStreamerTest.java | 200 +++++++++++++++++++-- .../internal/restart/RestartProofKeyValueView.java | 13 +- .../internal/restart/RestartProofRecordView.java | 13 +- .../table/AsyncApiKeyValueViewAdapter.java | 9 +- .../internal/table/AsyncApiRecordViewAdapter.java | 10 +- .../internal/table/KeyValueBinaryViewImpl.java | 10 +- .../ignite/internal/table/KeyValueViewImpl.java | 10 +- .../internal/table/PublicApiThreadingViewBase.java | 16 +- .../internal/table/RecordBinaryViewImpl.java | 10 +- .../ignite/internal/table/RecordViewImpl.java | 12 +- .../internal/table/StreamerReceiverRunner.java | 4 +- 26 files changed, 664 insertions(+), 174 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java index 899c67ec2ce..443181dd419 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java +++ b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.function.Function; +import org.apache.ignite.marshalling.Marshaller; import org.jetbrains.annotations.Nullable; /** @@ -34,8 +35,8 @@ import org.jetbrains.annotations.Nullable; @SuppressWarnings("InterfaceMayBeAnnotatedFunctional") public interface DataStreamerReceiver<T, A, R> { /** - * Receives an item from the data streamer (see {@link DataStreamerTarget#streamData(Publisher, Function, Function, - * ReceiverDescriptor, Subscriber, DataStreamerOptions, Object)}). + * Receives an item from the data streamer (see {@link DataStreamerTarget#streamData(Publisher, DataStreamerReceiverDescriptor, + * Function, Function, Object, Subscriber, DataStreamerOptions)}). * * <p>The receiver is called for each page (batch) in the data streamer and is responsible for processing the items, * updating zero or more tables, and returning a result. @@ -49,4 +50,31 @@ public interface DataStreamerReceiver<T, A, R> { List<T> page, DataStreamerReceiverContext ctx, @Nullable A arg); + + /** + * Marshaller for the receiver payload (batch items). + * + * @return Payload marshaller. + */ + default @Nullable Marshaller<T, byte[]> payloadMarshaller() { + return null; + } + + /** + * Marshaller for the input argument. + * + * @return Input marshaller. + */ + default @Nullable Marshaller<A, byte[]> argumentMarshaller() { + return null; + } + + /** + * Marshaller for the job result. + * + * @return Result marshaller. + */ + default @Nullable Marshaller<R, byte[]> resultMarshaller() { + return null; + } } diff --git a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiverDescriptor.java similarity index 53% copy from modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java copy to modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiverDescriptor.java index fb9e4f8821e..ff0077049e3 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java +++ b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiverDescriptor.java @@ -26,20 +26,26 @@ import org.jetbrains.annotations.Nullable; /** * Data streamer receiver descriptor. */ -public class ReceiverDescriptor<A> { +public class DataStreamerReceiverDescriptor<T, A, R> { private final String receiverClassName; private final List<DeploymentUnit> units; private final ReceiverExecutionOptions options; + private final @Nullable Marshaller<T, byte[]> payloadMarshaller; + private final @Nullable Marshaller<A, byte[]> argumentMarshaller; - private ReceiverDescriptor( + private final @Nullable Marshaller<R, byte[]> resultMarshaller; + + private DataStreamerReceiverDescriptor( String receiverClassName, List<DeploymentUnit> units, ReceiverExecutionOptions options, - @Nullable Marshaller<A, byte[]> argumentMarshaller + @Nullable Marshaller<T, byte[]> payloadMarshaller, + @Nullable Marshaller<A, byte[]> argumentMarshaller, + @Nullable Marshaller<R, byte[]> resultMarshaller ) { Objects.requireNonNull(receiverClassName); Objects.requireNonNull(units); @@ -47,7 +53,9 @@ public class ReceiverDescriptor<A> { this.receiverClassName = receiverClassName; this.units = units; this.options = options; + this.payloadMarshaller = payloadMarshaller; this.argumentMarshaller = argumentMarshaller; + this.resultMarshaller = resultMarshaller; } /** @@ -77,12 +85,39 @@ public class ReceiverDescriptor<A> { return options; } + /** + * Payload marshaller. + * + * @return Payload marshaller. + */ + public @Nullable Marshaller<T, byte[]> payloadMarshaller() { + return payloadMarshaller; + } + + /** + * Argument marshaller. + * + * @return Argument marshaller. + */ + public @Nullable Marshaller<A, byte[]> argumentMarshaller() { + return argumentMarshaller; + } + + /** + * Result marshaller. + * + * @return Result marshaller. + */ + public @Nullable Marshaller<R, byte[]> resultMarshaller() { + return resultMarshaller; + } + /** * Create a new builder. * * @return Receiver descriptor builder. */ - public static <A> Builder<A> builder(String receiverClassName) { + public static <T, A, R> Builder<T, A, R> builder(String receiverClassName) { Objects.requireNonNull(receiverClassName); return new Builder<>(receiverClassName); @@ -93,23 +128,36 @@ public class ReceiverDescriptor<A> { * * @return Receiver descriptor builder. */ - public static <A> Builder<A> builder(Class<? extends DataStreamerReceiver<?, A, ?>> receiverClass) { + public static <T, A, R> Builder<T, A, R> builder(Class<? extends DataStreamerReceiver<T, A, R>> receiverClass) { Objects.requireNonNull(receiverClass); return new Builder<>(receiverClass.getName()); } - public @Nullable Marshaller<A, byte[]> argumentMarshaller() { - return argumentMarshaller; + /** + * Create a new builder from a receiver instance. + * Populates {@link #payloadMarshaller()}, {@link #argumentMarshaller()}, and {@link #resultMarshaller()} from the provided receiver. + * + * @return Receiver descriptor builder. + */ + public static <T, A, R> Builder<T, A, R> builder(DataStreamerReceiver<T, A, R> receiver) { + Objects.requireNonNull(receiver); + + return new Builder<T, A, R>(receiver.getClass().getName()) + .payloadMarshaller(receiver.payloadMarshaller()) + .argumentMarshaller(receiver.argumentMarshaller()) + .resultMarshaller(receiver.resultMarshaller()); } /** * Builder. */ - public static class Builder<A> { + public static class Builder<T, A, R> { private final String receiverClassName; private List<DeploymentUnit> units; + private @Nullable Marshaller<T, byte[]> payloadMarshaller; private @Nullable Marshaller<A, byte[]> argumentMarshaller; + private @Nullable Marshaller<R, byte[]> resultMarshaller; private ReceiverExecutionOptions options = ReceiverExecutionOptions.DEFAULT; private Builder(String receiverClassName) { @@ -124,7 +172,7 @@ public class ReceiverDescriptor<A> { * @param units Deployment units. * @return This builder. */ - public Builder<A> units(List<DeploymentUnit> units) { + public Builder<T, A, R> units(List<DeploymentUnit> units) { this.units = units; return this; } @@ -135,7 +183,7 @@ public class ReceiverDescriptor<A> { * @param units Deployment units. * @return This builder. */ - public Builder<A> units(DeploymentUnit... units) { + public Builder<T, A, R> units(DeploymentUnit... units) { this.units = List.of(units); return this; } @@ -146,13 +194,41 @@ public class ReceiverDescriptor<A> { * @param options Receiver execution options. * @return This builder. */ - public Builder<A> options(ReceiverExecutionOptions options) { + public Builder<T, A, R> options(ReceiverExecutionOptions options) { this.options = options; return this; } - public Builder<A> argumentMarshaller(@Nullable Marshaller<A, byte[]> argumentsMarshaller) { - this.argumentMarshaller = argumentsMarshaller; + /** + * Sets the payload marshaller. + * + * @param payloadMarshaller Payload marshaller. + * @return This builder. + */ + public Builder<T, A, R> payloadMarshaller(@Nullable Marshaller<T, byte[]> payloadMarshaller) { + this.payloadMarshaller = payloadMarshaller; + return this; + } + + /** + * Sets the argument marshaller. + * + * @param argumentMarshaller Argument marshaller. + * @return This builder. + */ + public Builder<T, A, R> argumentMarshaller(@Nullable Marshaller<A, byte[]> argumentMarshaller) { + this.argumentMarshaller = argumentMarshaller; + return this; + } + + /** + * Sets the result marshaller. + * + * @param resultMarshaller Result marshaller. + * @return This builder. + */ + public Builder<T, A, R> resultMarshaller(@Nullable Marshaller<R, byte[]> resultMarshaller) { + this.resultMarshaller = resultMarshaller; return this; } @@ -161,12 +237,14 @@ public class ReceiverDescriptor<A> { * * @return Receiver descriptor. */ - public ReceiverDescriptor<A> build() { - return new ReceiverDescriptor<>( + public DataStreamerReceiverDescriptor<T, A, R> build() { + return new DataStreamerReceiverDescriptor<>( receiverClassName, - units == null ? List.of() : units, + units != null ? units : List.of(), options, - argumentMarshaller + payloadMarshaller, + argumentMarshaller, + resultMarshaller ); } } diff --git a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java index c7f927965ff..df429e206a3 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java +++ b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java @@ -17,6 +17,7 @@ package org.apache.ignite.table; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow; import java.util.function.Function; @@ -50,18 +51,64 @@ public interface DataStreamerTarget<T> { * from {@link Flow.Subscription#request(long)} calls. * @param options Options (can be null). * @param receiverArg Receiver arguments. - * @return Future that will be completed when the stream is finished. * @param <E> Producer item type. * @param <V> Payload type. * @param <R> Result type. * @param <A> Receiver job argument type. + * @return Future that will be completed when the stream is finished. + * @deprecated Use {@link #streamData(Flow.Publisher, DataStreamerReceiverDescriptor, Function, Function, Object, Flow.Subscriber, + * DataStreamerOptions)}. */ - <E, V, R, A> CompletableFuture<Void> streamData( + @Deprecated + default <E, V, R, A> CompletableFuture<Void> streamData( Flow.Publisher<E> publisher, Function<E, T> keyFunc, Function<E, V> payloadFunc, ReceiverDescriptor<A> receiver, @Nullable Flow.Subscriber<R> resultSubscriber, @Nullable DataStreamerOptions options, - @Nullable A receiverArg); + @Nullable A receiverArg) { + Objects.requireNonNull(receiver); + + DataStreamerReceiverDescriptor<V, A, R> desc = DataStreamerReceiverDescriptor + .<V, A, R>builder(receiver.receiverClassName()) + .units(receiver.units()) + .options(receiver.options()) + .build(); + + return streamData( + publisher, + desc, + keyFunc, + payloadFunc, + receiverArg, + resultSubscriber, + options); + } + + /** + * Streams data with receiver. The receiver is responsible for processing the data and updating zero or more tables. + * + * @param publisher Producer. + * @param keyFunc Key function. The key is only used locally for colocation. + * @param payloadFunc Payload function. The payload is sent to the receiver. + * @param resultSubscriber Optional subscriber for the receiver results. + * NOTE: The result subscriber follows the pace of publisher and ignores backpressure + * from {@link Flow.Subscription#request(long)} calls. + * @param options Options (can be null). + * @param receiverArg Receiver arguments. + * @return Future that will be completed when the stream is finished. + * @param <E> Producer item type. + * @param <V> Payload type. + * @param <A> Receiver job argument type. + * @param <R> Result type. + */ + <E, V, A, R> CompletableFuture<Void> streamData( + Flow.Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, + Function<E, T> keyFunc, + Function<E, V> payloadFunc, + @Nullable A receiverArg, + @Nullable Flow.Subscriber<R> resultSubscriber, + @Nullable DataStreamerOptions options); } diff --git a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java b/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java index fb9e4f8821e..44c6afed64c 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java +++ b/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java @@ -25,7 +25,10 @@ import org.jetbrains.annotations.Nullable; /** * Data streamer receiver descriptor. + * + * @deprecated Replaced by {@link DataStreamerReceiverDescriptor}. */ +@Deprecated public class ReceiverDescriptor<A> { private final String receiverClassName; diff --git a/modules/api/src/test/java/org/apache/ignite/table/DataStreamerReceiverDescriptorTest.java b/modules/api/src/test/java/org/apache/ignite/table/DataStreamerReceiverDescriptorTest.java new file mode 100644 index 00000000000..c4701a2b84a --- /dev/null +++ b/modules/api/src/test/java/org/apache/ignite/table/DataStreamerReceiverDescriptorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.table; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.marshalling.ByteArrayMarshaller; +import org.apache.ignite.marshalling.Marshaller; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Test; + +/** + * Data streamer receiver descriptor test. + */ +public class DataStreamerReceiverDescriptorTest { + @Test + void testReceiverDescriptorBuilderPopulatesMarshallers() { + DataStreamerReceiverDescriptor<Integer, String, UUID> desc = DataStreamerReceiverDescriptor.builder(new TestReceiver()).build(); + + assertInstanceOf(IntMarshaller.class, desc.payloadMarshaller()); + assertInstanceOf(StrMarshaller.class, desc.argumentMarshaller()); + assertInstanceOf(UuidMarshaller.class, desc.resultMarshaller()); + } + + private static class IntMarshaller implements ByteArrayMarshaller<Integer> { } + + private static class StrMarshaller implements ByteArrayMarshaller<String> { } + + private static class UuidMarshaller implements ByteArrayMarshaller<UUID> { } + + private static class TestReceiver implements DataStreamerReceiver<Integer, String, UUID> { + @Override + public @Nullable CompletableFuture<List<UUID>> receive(List<Integer> page, DataStreamerReceiverContext ctx, @Nullable String arg) { + return null; + } + + @Override + public @Nullable Marshaller<Integer, byte[]> payloadMarshaller() { + return new IntMarshaller(); + } + + @Override + public @Nullable Marshaller<String, byte[]> argumentMarshaller() { + return new StrMarshaller(); + } + + @Override + public @Nullable Marshaller<UUID, byte[]> resultMarshaller() { + return new UuidMarshaller(); + } + } +} diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java index 621831854dd..771ffc0d1d6 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.proto; import static org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils.unsupportedTypeException; import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Compute.MARSHALLING_TYPE_MISMATCH_ERR; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -40,9 +41,11 @@ import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.marshalling.Marshaller; import org.apache.ignite.sql.ColumnType; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiver; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.Tuple; import org.jetbrains.annotations.Nullable; @@ -64,19 +67,20 @@ public class StreamerReceiverSerializer { * @param receiverArg Receiver arguments. * @param items Items. */ - public static <A> void serializeReceiverInfoOnClient( + public static <T, A> void serializeReceiverInfoOnClient( ClientMessagePacker w, String receiverClassName, - A receiverArg, - @Nullable Marshaller<A, byte[]> receiverArgsMarshaller, - Collection<?> items) { + @Nullable A receiverArg, + @Nullable Marshaller<T, byte[]> itemsMarshaller, + @Nullable Marshaller<A, byte[]> receiverArgMarshaller, + Collection<T> items) { // className + arg + items size + item type + items. int binaryTupleSize = 1 + 3 + 1 + 1 + items.size(); var builder = new BinaryTupleBuilder(binaryTupleSize); builder.appendString(receiverClassName); - appendArg(builder, receiverArg); - appendCollectionToBinaryTuple(builder, items); + appendArg(builder, receiverArg, receiverArgMarshaller); + appendCollectionToBinaryTuple(builder, items, itemsMarshaller); w.packInt(binaryTupleSize); w.packBinaryTuple(builder); @@ -89,17 +93,19 @@ public class StreamerReceiverSerializer { * @param receiverArg Receiver arguments. * @param items Items. */ - public static <A> byte[] serializeReceiverInfoWithElementCount( - ReceiverDescriptor<A> receiver, + public static <T, A, R> byte[] serializeReceiverInfoWithElementCount( + DataStreamerReceiverDescriptor<T, A, R> receiver, @Nullable A receiverArg, - Collection<?> items) { + @Nullable Marshaller<T, byte[]> itemsMarshaller, + @Nullable Marshaller<A, byte[]> receiverArgMarshaller, + Collection<T> items) { // className + arg + items size + item type + items. int binaryTupleSize = 1 + 3 + 1 + 1 + items.size(); var builder = new BinaryTupleBuilder(binaryTupleSize); builder.appendString(receiver.receiverClassName()); - appendArg(builder, receiverArg); - appendCollectionToBinaryTuple(builder, items); + appendArg(builder, receiverArg, receiverArgMarshaller); + appendCollectionToBinaryTuple(builder, items, itemsMarshaller); ByteBuffer buf = builder.build(); int bufSize = buf.limit() - buf.position(); @@ -116,9 +122,13 @@ public class StreamerReceiverSerializer { * * @param bytes Bytes. * @param elementCount Number of elements in the binary tuple. + * @param receiverFactory Function to create a receiver instance from its class name. * @return Streamer receiver info. */ - public static SteamerReceiverInfo deserializeReceiverInfo(ByteBuffer bytes, int elementCount) { + public static SteamerReceiverInfo deserializeReceiverInfo( + ByteBuffer bytes, + int elementCount, + Function<String, DataStreamerReceiver<Object, Object, Object>> receiverFactory) { var reader = new BinaryTupleReader(elementCount, bytes); int readerIndex = 0; @@ -128,13 +138,15 @@ public class StreamerReceiverSerializer { throw new IgniteException(PROTOCOL_ERR, "Receiver class name is null"); } - Object receiverArg = readArg(reader, readerIndex); + DataStreamerReceiver<Object, Object, Object> receiver = receiverFactory.apply(receiverClassName); + + Object receiverArg = readArg(reader, readerIndex, receiver.argumentMarshaller()); readerIndex += 3; - List<Object> items = readCollectionFromBinaryTuple(reader, readerIndex); + List<Object> items = readCollectionFromBinaryTuple(reader, readerIndex, receiver.payloadMarshaller()); - return new SteamerReceiverInfo(receiverClassName, receiverArg, items); + return new SteamerReceiverInfo(receiver, receiverArg, items); } /** @@ -142,14 +154,16 @@ public class StreamerReceiverSerializer { * * @param receiverResults Receiver results. */ - public static byte @Nullable [] serializeReceiverJobResults(@Nullable List<Object> receiverResults) { + public static <T> byte @Nullable [] serializeReceiverJobResults( + @Nullable List<T> receiverResults, + @Nullable Marshaller<T, byte[]> resultsMarshaller) { if (receiverResults == null || receiverResults.isEmpty()) { return null; } int numElements = 2 + receiverResults.size(); var builder = new BinaryTupleBuilder(numElements); - appendCollectionToBinaryTuple(builder, receiverResults); + appendCollectionToBinaryTuple(builder, receiverResults, resultsMarshaller); ByteBuffer res = builder.build(); @@ -172,7 +186,9 @@ public class StreamerReceiverSerializer { * @param results Serialized results. * @return Deserialized results. */ - public static <R> List<R> deserializeReceiverJobResults(byte[] results) { + public static <R> List<R> deserializeReceiverJobResults( + byte[] results, + @Nullable Marshaller<R, byte[]> resultsMarshaller) { if (results == null || results.length == 0) { return List.of(); } @@ -182,7 +198,7 @@ public class StreamerReceiverSerializer { var reader = new BinaryTupleReader(numElements, buf.slice().order(ByteOrder.LITTLE_ENDIAN)); - return readCollectionFromBinaryTuple(reader, 0); + return readCollectionFromBinaryTuple(reader, 0, resultsMarshaller); } /** @@ -213,7 +229,9 @@ public class StreamerReceiverSerializer { * @param r Reader. * @return Receiver results. */ - public static @Nullable <R> List<R> deserializeReceiverResultsOnClient(ClientMessageUnpacker r) { + public static @Nullable <R> List<R> deserializeReceiverResultsOnClient( + ClientMessageUnpacker r, + @Nullable Marshaller<R, byte[]> resultsMarshaller) { if (r.tryUnpackNil()) { return null; } @@ -222,7 +240,7 @@ public class StreamerReceiverSerializer { byte[] bytes = r.readBinary(); var reader = new BinaryTupleReader(numElements, bytes); - return readCollectionFromBinaryTuple(reader, 0); + return readCollectionFromBinaryTuple(reader, 0, resultsMarshaller); } /** @@ -231,11 +249,26 @@ public class StreamerReceiverSerializer { * @param builder Target builder. * @param items Items. */ - private static <T> void appendCollectionToBinaryTuple(BinaryTupleBuilder builder, Collection<T> items) { + private static <T> void appendCollectionToBinaryTuple( + BinaryTupleBuilder builder, + Collection<T> items, + @Nullable Marshaller<T, byte[]> itemsMarshaller) { assert items != null : "items can't be null"; assert !items.isEmpty() : "items can't be empty"; assert builder != null : "builder can't be null"; + if (itemsMarshaller != null) { + builder.appendInt(ColumnType.BYTE_ARRAY.id()); + builder.appendInt(items.size()); + + for (T item : items) { + byte[] bytes = itemsMarshaller.marshal(item); + builder.appendBytes(bytes); + } + + return; + } + T firstItem = items.iterator().next(); Objects.requireNonNull(firstItem); Class<?> type = firstItem.getClass(); @@ -254,14 +287,23 @@ public class StreamerReceiverSerializer { } } - private static <R> List<R> readCollectionFromBinaryTuple(BinaryTupleReader reader, int readerIndex) { + private static <R> List<R> readCollectionFromBinaryTuple( + BinaryTupleReader reader, + int readerIndex, + @Nullable Marshaller<R, byte[]> itemsMarshaller) { int typeId = reader.intValue(readerIndex++); Function<Integer, Object> itemReader = readerForType(reader, typeId); int itemsCount = reader.intValue(readerIndex++); List<R> items = new ArrayList<>(itemsCount); for (int i = 0; i < itemsCount; i++) { - items.add((R) itemReader.apply(readerIndex++)); + Object itemRaw = itemReader.apply(readerIndex++); + + R item = itemsMarshaller == null + ? (R) itemRaw + : unmarshalBytes(itemsMarshaller, itemRaw); + + items.add(item); } return items; @@ -399,7 +441,16 @@ public class StreamerReceiverSerializer { } } - private static <T> void appendArg(BinaryTupleBuilder builder, @Nullable T arg) { + private static <T> void appendArg( + BinaryTupleBuilder builder, + @Nullable T arg, + @Nullable Marshaller<T, byte[]> receiverArgMarshaller) { + if (receiverArgMarshaller != null) { + byte[] bytes = receiverArgMarshaller.marshal(arg); + ClientBinaryTupleUtils.appendObject(builder, bytes); + return; + } + if (arg instanceof Tuple) { builder.appendInt(TupleWithSchemaMarshalling.TYPE_ID_TUPLE); builder.appendInt(0); // Scale. @@ -411,18 +462,50 @@ public class StreamerReceiverSerializer { ClientBinaryTupleUtils.appendObject(builder, arg); } - private static @Nullable Object readArg(BinaryTupleReader reader, int index) { + private static @Nullable Object readArg( + BinaryTupleReader reader, + int index, + @Nullable Marshaller<Object, byte[]> receiverArgMarshaller) { if (reader.hasNullValue(index)) { - return null; + return receiverArgMarshaller == null + ? null + : receiverArgMarshaller.unmarshal(null); } if (reader.intValue(index) == TupleWithSchemaMarshalling.TYPE_ID_TUPLE) { return readTuple(reader, index + 2); } - return ClientBinaryTupleUtils.readObject(reader, index); + var obj = ClientBinaryTupleUtils.readObject(reader, index); + + return receiverArgMarshaller == null + ? obj + : unmarshalBytes(receiverArgMarshaller, obj); } + private static <T> @Nullable T unmarshalBytes(Marshaller<T, byte[]> marshaller, @Nullable Object input) { + try { + if (input instanceof byte[]) { + return marshaller.unmarshal((byte[]) input); + } else if (input == null) { + return marshaller.unmarshal(null); + } + } catch (Exception ex) { + throw new MarshallerException( + UUID.randomUUID(), MARSHALLING_TYPE_MISMATCH_ERR, "Exception in user-defined marshaller: " + ex.getMessage(), ex); + } + + throw new MarshallerException( + UUID.randomUUID(), + MARSHALLING_TYPE_MISMATCH_ERR, + "Marshaller is defined in the DataStreamerReceiver implementation, " + + "expected argument type: `byte[]`, actual: `" + input.getClass() + "`. " + + "Ensure that DataStreamerReceiverDescriptor marshallers match DataStreamerReceiver marshallers.", + null + ); + } + + private static <T> void appendTuple(BinaryTupleBuilder builder, Tuple arg) { builder.appendBytes(TupleWithSchemaMarshalling.marshal(arg)); } @@ -436,23 +519,23 @@ public class StreamerReceiverSerializer { * Streamer receiver info. */ public static class SteamerReceiverInfo { - private final String className; + private final DataStreamerReceiver<Object, Object, Object> receiver; private final @Nullable Object arg; private final List<Object> items; - private SteamerReceiverInfo(String className, @Nullable Object arg, List<Object> items) { - this.className = className; + private SteamerReceiverInfo(DataStreamerReceiver<Object, Object, Object> receiver, @Nullable Object arg, List<Object> items) { + this.receiver = receiver; this.arg = arg; this.items = items; } /** - * Gets receiver class name. + * Get receiver instance. * - * @return Receiver class name. + * @return Receiver instance. */ - public String className() { - return className; + public DataStreamerReceiver<Object, Object, Object> receiver() { + return receiver; } /** diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java index a4300b7f014..b0af543f8df 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.streamer.StreamerSubscriber; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOperationType; import org.apache.ignite.table.DataStreamerOptions; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.ReceiverExecutionOptions; import org.jetbrains.annotations.Nullable; @@ -71,8 +71,8 @@ class ClientDataStreamer { StreamerPartitionAwarenessProvider<T, Integer> partitionAwarenessProvider, ClientTable tbl, @Nullable Flow.Subscriber<R> resultSubscriber, - ReceiverDescriptor<A> receiverDescriptor, - A receiverArgs + DataStreamerReceiverDescriptor<V, A, R> receiverDescriptor, + @Nullable A receiverArg ) { var opts = receiverDescriptor.options(); if (opts != null && !opts.equals(ReceiverExecutionOptions.DEFAULT)) { @@ -96,12 +96,14 @@ class ClientDataStreamer { StreamerReceiverSerializer.serializeReceiverInfoOnClient( w, receiverDescriptor.receiverClassName(), - receiverArgs, + receiverArg, + receiverDescriptor.payloadMarshaller(), receiverDescriptor.argumentMarshaller(), items); }, in -> resultSubscriber != null - ? StreamerReceiverSerializer.deserializeReceiverResultsOnClient(in.in()) + ? StreamerReceiverSerializer.deserializeReceiverResultsOnClient( + in.in(), receiverDescriptor.resultMarshaller()) : null, partitionAssignment.get(partitionId), new RetryLimitPolicy().retryLimit(options.retryLimit()), diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java index b253a15a14a..799640dfe1d 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java @@ -44,8 +44,8 @@ import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.KeyValueView; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -538,16 +538,15 @@ public class ClientKeyValueBinaryView extends AbstractClientView<Entry<Tuple, Tu return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl); } - /** {@inheritDoc} */ @Override - public <E, V, R, A> CompletableFuture<Void> streamData( + public <E, V, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, Entry<Tuple, Tuple>> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java index 165d8cc1917..81356fd7132 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java @@ -62,8 +62,8 @@ import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.KeyValueView; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -727,16 +727,15 @@ public class ClientKeyValueView<K, V> extends AbstractClientView<Entry<K, V>> im return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl); } - /** {@inheritDoc} */ @Override - public <E, P, R, A> CompletableFuture<Void> streamData( + public <E, P, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<P, A, R> receiver, Function<E, Entry<K, V>> keyFunc, Function<E, P> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java index ef7f75c5951..b7de009fb6c 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.client.sql.ClientSql; import org.apache.ignite.internal.streamer.StreamerBatchSender; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; @@ -453,16 +453,15 @@ public class ClientRecordBinaryView extends AbstractClientView<Tuple> implements return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl); } - /** {@inheritDoc} */ @Override - public <E, V, R, A> CompletableFuture<Void> streamData( + public <E, V, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, Tuple> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java index c0b0484f965..630e679e410 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java @@ -44,7 +44,7 @@ import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; @@ -457,14 +457,14 @@ public class ClientRecordView<R> extends AbstractClientView<R> implements Record } @Override - public <E, V, R1, A> CompletableFuture<Void> streamData( + public <E, V, A, R1> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R1> receiver, Function<E, R> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R1> resultSubscriber, - @Nullable DataStreamerOptions options, - A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java index d9e2052b3bf..97bc391a935 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java @@ -27,8 +27,8 @@ import org.apache.ignite.lang.AsyncCursor; import org.apache.ignite.lang.Cursor; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.DataStreamerTarget; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.criteria.Criteria; import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.table.criteria.CriteriaQuerySource; @@ -53,22 +53,22 @@ abstract class PublicApiClientViewBase<T> implements DataStreamerTarget<T>, Crit } @Override - public <E, V, R, A> CompletableFuture<Void> streamData( + public <E, V, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, T> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - A receiverArg) { + @Nullable DataStreamerOptions options) { return executeAsyncOp(() -> streamerTarget.streamData( publisher, + receiver, keyFunc, payloadFunc, - receiver, + receiverArg, resultSubscriber, - options, - receiverArg)); + options)); } @Override diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index e96bb607415..a7c7967fbfa 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -61,8 +61,8 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.QualifiedName; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.ReceiverExecutionOptions; import org.jetbrains.annotations.Nullable; @@ -551,8 +551,12 @@ public class FakeInternalTable implements InternalTable, StreamerReceiverRunner } @Override - public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(ReceiverDescriptor<A> receiver, @Nullable A receiverArg, - Collection<I> items, ClusterNode node, List<DeploymentUnit> deploymentUnits) { + public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync( + DataStreamerReceiverDescriptor<I, A, R> receiver, + @Nullable A receiverArg, + Collection<I> items, + ClusterNode node, + List<DeploymentUnit> deploymentUnits) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 8baa97a8209..fbfe2a89423 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -83,8 +83,8 @@ import org.apache.ignite.lang.ErrorGroups.Compute; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.TableNotFoundException; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.QualifiedName; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.ReceiverExecutionOptions; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; @@ -587,15 +587,16 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive @Override public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync( - ReceiverDescriptor<A> receiver, + DataStreamerReceiverDescriptor<I, A, R> receiver, @Nullable A receiverArg, Collection<I> items, ClusterNode node, List<DeploymentUnit> deploymentUnits) { - var payload = StreamerReceiverSerializer.serializeReceiverInfoWithElementCount(receiver, receiverArg, items); + var payload = StreamerReceiverSerializer.serializeReceiverInfoWithElementCount( + receiver, receiverArg, receiver.payloadMarshaller(), receiver.argumentMarshaller(), items); return runReceiverAsync(payload, node, deploymentUnits, receiver.options()) - .thenApply(StreamerReceiverSerializer::deserializeReceiverJobResults); + .thenApply(r -> StreamerReceiverSerializer.deserializeReceiverJobResults(r, receiver.resultMarshaller())); } @Override diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java index 3c77c8a1406..fb34d8ec4cd 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java @@ -44,21 +44,25 @@ public class StreamerReceiverJob implements ComputeJob<byte[], byte[]> { SteamerReceiverInfo receiverInfo = StreamerReceiverSerializer.deserializeReceiverInfo( buf.slice().order(ByteOrder.LITTLE_ENDIAN), - payloadElementCount); + payloadElementCount, + receiverClassName -> { + ClassLoader classLoader = ((JobExecutionContextImpl) context).classLoader(); + Class<DataStreamerReceiver<Object, Object, Object>> receiverClass = ComputeUtils.receiverClass( + classLoader, receiverClassName); - ClassLoader classLoader = ((JobExecutionContextImpl) context).classLoader(); - Class<DataStreamerReceiver<Object, Object, Object>> receiverClass = ComputeUtils.receiverClass( - classLoader, receiverInfo.className()); + return ComputeUtils.instantiateReceiver(receiverClass); + }); - DataStreamerReceiver<Object, Object, Object> receiver = ComputeUtils.instantiateReceiver(receiverClass); DataStreamerReceiverContext receiverContext = context::ignite; + DataStreamerReceiver<Object, Object, Object> receiver = receiverInfo.receiver(); + CompletableFuture<List<Object>> receiverRes = receiver.receive(receiverInfo.items(), receiverContext, receiverInfo.arg()); if (receiverRes == null) { return CompletableFuture.completedFuture(null); } - return receiverRes.thenApply(StreamerReceiverSerializer::serializeReceiverJobResults); + return receiverRes.thenApply(r -> StreamerReceiverSerializer.serializeReceiverJobResults(r, receiver.resultMarshaller())); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java index 76f8373f40c..a5656216a53 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java @@ -55,6 +55,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.marshalling.ByteArrayMarshaller; +import org.apache.ignite.marshalling.Marshaller; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.raft.jraft.test.TestUtils; import org.apache.ignite.sql.IgniteSql; @@ -64,6 +66,7 @@ import org.apache.ignite.table.DataStreamerOperationType; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.DataStreamerReceiver; import org.apache.ignite.table.DataStreamerReceiverContext; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.DataStreamerTarget; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.ReceiverDescriptor; @@ -418,6 +421,65 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat var resultSubscriber = returnResults ? new TestSubscriber<String>() : null; + try (var publisher = new SubmissionPublisher<Tuple>()) { + streamerFut = target.streamData( + publisher, + DataStreamerReceiverDescriptor.builder(TestReceiver.class).build(), + keyFunc, + t -> t.stringValue(1), + "arg1", + resultSubscriber, + DataStreamerOptions.builder().retryLimit(0).build() + ); + + // Same ID goes to the same partition. + publisher.submit(tuple(1, "val1")); + publisher.submit(tuple(1, "val2")); + publisher.submit(tuple(1, "val3")); + } + + assertThat(streamerFut, willCompleteSuccessfully()); + + if (returnResults) { + assertEquals(1, resultSubscriber.items.size()); + assertEquals("Received: 3 items, arg1 arg", resultSubscriber.items.iterator().next()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWithReceiverRecordBinaryViewDeprecated(boolean returnResults) { + testWithReceiverDeprecated(defaultTable().recordView(), Function.identity(), returnResults); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWithReceiverKvBinaryViewDeprecated(boolean returnResults) { + testWithReceiverDeprecated(defaultTable().keyValueView(), t -> Map.entry(t, t), returnResults); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWithReceiverRecordPojoViewDeprecated(boolean returnResults) { + RecordView<PersonPojo> view = defaultTable().recordView(PersonPojo.class); + + testWithReceiverDeprecated(view, t -> new PersonPojo(t.intValue(0)), returnResults); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWithReceiverKvPojoViewDeprecated(boolean returnResults) { + KeyValueView<Integer, PersonValPojo> view = defaultTable().keyValueView(Mapper.of(Integer.class), Mapper.of(PersonValPojo.class)); + + testWithReceiverDeprecated(view, t -> Map.entry(t.intValue(0), new PersonValPojo()), returnResults); + } + + @SuppressWarnings("deprecation") + private static <T> void testWithReceiverDeprecated(DataStreamerTarget<T> target, Function<Tuple, T> keyFunc, boolean returnResults) { + CompletableFuture<Void> streamerFut; + + var resultSubscriber = returnResults ? new TestSubscriber<String>() : null; + try (var publisher = new SubmissionPublisher<Tuple>()) { streamerFut = target.streamData( publisher, @@ -456,10 +518,12 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat try (var publisher = new SubmissionPublisher<Tuple>()) { streamerFut = view.streamData( publisher, + DataStreamerReceiverDescriptor.builder(NodeNameReceiver.class).build(), t -> t, t -> t.intValue(0), - ReceiverDescriptor.builder(NodeNameReceiver.class).build(), - null, null, null + null, + null, + null ); for (int i = 0; i < count; i++) { @@ -482,18 +546,17 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat public void testReceiverException(boolean async) { CompletableFuture<Void> streamerFut; - Object key = 0; Tuple item = tupleKey(1); try (var publisher = new SubmissionPublisher<Tuple>()) { streamerFut = defaultTable().recordView().streamData( publisher, + DataStreamerReceiverDescriptor.builder(TestReceiver.class).build(), t -> t, - t -> key, - ReceiverDescriptor.builder(TestReceiver.class).build(), + t -> "", + async ? "throw-async" : "throw", null, - DataStreamerOptions.builder().retryLimit(0).pageSize(1).build(), - async ? "throw-async" : "throw"); + DataStreamerOptions.builder().retryLimit(0).pageSize(1).build()); publisher.submit(item); } @@ -572,12 +635,12 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat streamerFut = defaultTable().recordView().streamData( publisher, + DataStreamerReceiverDescriptor.builder(TupleReceiver.class).build(), Function.identity(), Function.identity(), - ReceiverDescriptor.builder(TupleReceiver.class).build(), + receiverArg, resultSubscriber, - null, - receiverArg + null ); // Tuple payload. @@ -649,6 +712,79 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat assertEquals(2, resTupleInner2.intValue("int")); } + @ParameterizedTest + @ValueSource(strings = {"arg1", ""}) + public void testMarshallingReceiver(String arg) { + // Check that null arg works. + arg = arg.isEmpty() ? null : arg; + + DataStreamerReceiverDescriptor<String, String, String> desc = DataStreamerReceiverDescriptor + .builder(MarshallingReceiver.class) + .payloadMarshaller(new StringSuffixMarshaller()) + .argumentMarshaller(new StringSuffixMarshaller()) + .resultMarshaller(new StringSuffixMarshaller()) + .build(); + + CompletableFuture<Void> streamerFut; + var resultSubscriber = new TestSubscriber<String>(); + + try (var publisher = new SubmissionPublisher<String>()) { + streamerFut = defaultTable().recordView().streamData( + publisher, + desc, + x -> Tuple.create().set("id", 1), + Function.identity(), + arg, + resultSubscriber, + null + ); + + publisher.submit("val1"); + publisher.submit("val2"); + } + + assertThat(streamerFut, willCompleteSuccessfully()); + assertEquals(2, resultSubscriber.items.size()); + + String expected = "received[arg=" + arg + ":beforeMarshal:afterUnmarshal,val=val1:beforeMarshal:afterUnmarshal]" + + ":beforeMarshal:afterUnmarshal"; + + assertEquals(expected, resultSubscriber.items.get(0)); + } + + @Test + public void testReceiverMarshallerMismatch() { + DataStreamerReceiverDescriptor<String, String, String> desc = DataStreamerReceiverDescriptor + .builder(MarshallingReceiver.class) + .build(); + + CompletableFuture<Void> streamerFut; + + try (var publisher = new SubmissionPublisher<String>()) { + streamerFut = defaultTable().recordView().streamData( + publisher, + desc, + x -> Tuple.create().set("id", 1), + Function.identity(), + "arg", + null, + null + ); + + publisher.submit("val1"); + } + + var ex = assertThrows(CompletionException.class, () -> streamerFut.orTimeout(10, TimeUnit.SECONDS).join()); + DataStreamerException dsEx = (DataStreamerException) ex.getCause(); + + assertThat(dsEx.getMessage(), containsString( + "Marshaller is defined in the DataStreamerReceiver implementation, " + + "expected argument type: `byte[]`, actual: `class java.lang.String`. " + + "Ensure that DataStreamerReceiverDescriptor marshallers match DataStreamerReceiver marshallers.")); + + assertEquals("IGN-COMPUTE-13", dsEx.codeAsString()); + } + private Tuple receiverTupleRoundTrip(Tuple tuple, boolean asArg) { CompletableFuture<Void> streamerFut; var resultSubscriber = new TestSubscriber<Tuple>(); @@ -658,12 +794,12 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat streamerFut = defaultTable().recordView().streamData( publisher, + DataStreamerReceiverDescriptor.builder(TupleReceiver.class).build(), Function.identity(), Function.identity(), - ReceiverDescriptor.builder(TupleReceiver.class).build(), + receiverArg, resultSubscriber, - null, - receiverArg + null ); publisher.submit(asArg ? Tuple.create() : tuple); @@ -811,4 +947,42 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat return CompletableFuture.completedFuture(page); } } + + private static class StringSuffixMarshaller implements ByteArrayMarshaller<String> { + @Override + public byte @Nullable [] marshal(@Nullable String object) { + return ByteArrayMarshaller.super.marshal(object + ":beforeMarshal"); + } + + @Override + public @Nullable String unmarshal(byte @Nullable [] raw) { + return ByteArrayMarshaller.super.unmarshal(raw) + ":afterUnmarshal"; + } + } + + private static class MarshallingReceiver implements DataStreamerReceiver<String, String, String> { + @Override + public @Nullable CompletableFuture<List<String>> receive(List<String> page, DataStreamerReceiverContext ctx, @Nullable String arg) { + var results = page.stream() + .map(s -> "received[arg=" + arg + ",val=" + s + "]") + .collect(Collectors.toList()); + + return CompletableFuture.completedFuture(results); + } + + @Override + public @Nullable Marshaller<String, byte[]> payloadMarshaller() { + return new StringSuffixMarshaller(); + } + + @Override + public @Nullable Marshaller<String, byte[]> argumentMarshaller() { + return new StringSuffixMarshaller(); + } + + @Override + public @Nullable Marshaller<String, byte[]> resultMarshaller() { + return new StringSuffixMarshaller(); + } + } } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java index 41af0bd99eb..9231930b5df 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java @@ -30,8 +30,8 @@ import org.apache.ignite.lang.Cursor; import org.apache.ignite.lang.NullableValue; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.KeyValueView; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.criteria.Criteria; import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.tx.Transaction; @@ -272,16 +272,15 @@ class RestartProofKeyValueView<K, V> extends RestartProofApiObject<KeyValueView< } @Override - public <E, V1, R, A> CompletableFuture<Void> streamData( + public <E, V1, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V1, A, R> receiver, Function<E, Entry<K, V>> keyFunc, Function<E, V1> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - @Nullable A receiverArg - ) { - return attachedAsync(view -> view.streamData(publisher, keyFunc, payloadFunc, receiver, resultSubscriber, options, receiverArg)); + @Nullable DataStreamerOptions options) { + return attachedAsync(view -> view.streamData(publisher, receiver, keyFunc, payloadFunc, receiverArg, resultSubscriber, options)); } // TODO: IGNITE-23011 - support cursor transparency? diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java index 5b98b47bf1c..0fc37d3919b 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java @@ -30,7 +30,7 @@ import org.apache.ignite.lang.AsyncCursor; import org.apache.ignite.lang.Cursor; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.criteria.Criteria; import org.apache.ignite.table.criteria.CriteriaQueryOptions; @@ -235,16 +235,15 @@ class RestartProofRecordView<R> extends RestartProofApiObject<RecordView<R>> imp } @Override - public <E, V, R1, A> CompletableFuture<Void> streamData( + public <E, V, A, R1> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R1> receiver, Function<E, R> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Subscriber<R1> resultSubscriber, - @Nullable DataStreamerOptions options, - @Nullable A receiverArg - ) { - return attachedAsync(view -> view.streamData(publisher, keyFunc, payloadFunc, receiver, resultSubscriber, options, receiverArg)); + @Nullable DataStreamerOptions options) { + return attachedAsync(view -> view.streamData(publisher, receiver, keyFunc, payloadFunc, receiverArg, resultSubscriber, options)); } // TODO: IGNITE-23011 - support cursor transparency? diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java index 709315b0f83..e4a40d278c4 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java @@ -31,9 +31,9 @@ import org.apache.ignite.lang.Cursor; import org.apache.ignite.lang.NullableValue; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.DataStreamerTarget; import org.apache.ignite.table.KeyValueView; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.criteria.Criteria; import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.table.criteria.CriteriaQuerySource; @@ -301,9 +301,10 @@ public class AsyncApiKeyValueViewAdapter<K, V> implements KeyValueView<K, V> { } @Override - public <E, S, R, A> CompletableFuture<Void> streamData(Publisher<E> publisher, Function<E, Entry<K, V>> keyFunc, - Function<E, S> payloadFunc, ReceiverDescriptor<A> receiver, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, @Nullable A receiverArg) { + public <E, S, A, R> CompletableFuture<Void> streamData(Publisher<E> publisher, DataStreamerReceiverDescriptor<S, A, R> receiver, + Function<E, Entry<K, V>> keyFunc, Function<E, S> payloadFunc, @Nullable A receiverArg, + @Nullable Flow.Subscriber<R> resultSubscriber, + @Nullable DataStreamerOptions options) { throw new UnsupportedOperationException(); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java index 76c873083d8..fd8d971b9d3 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java @@ -29,8 +29,8 @@ import org.apache.ignite.lang.AsyncCursor; import org.apache.ignite.lang.Cursor; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.DataStreamerTarget; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.criteria.Criteria; import org.apache.ignite.table.criteria.CriteriaQueryOptions; @@ -46,7 +46,7 @@ import org.jetbrains.annotations.Nullable; public class AsyncApiRecordViewAdapter<V> implements RecordView<V> { private final RecordView<V> delegate; - public AsyncApiRecordViewAdapter(RecordView<V> delegate) { + AsyncApiRecordViewAdapter(RecordView<V> delegate) { this.delegate = delegate; } @@ -268,9 +268,9 @@ public class AsyncApiRecordViewAdapter<V> implements RecordView<V> { } @Override - public <E, V1, R, A> CompletableFuture<Void> streamData(Publisher<E> publisher, Function<E, V> keyFunc, Function<E, V1> payloadFunc, - ReceiverDescriptor<A> receiver, @Nullable Subscriber<R> resultSubscriber, @Nullable DataStreamerOptions options, - @Nullable A receiverArg) { + public <E, V1, A, R> CompletableFuture<Void> streamData(Publisher<E> publisher, DataStreamerReceiverDescriptor<V1, A, R> receiver, + Function<E, V> keyFunc, Function<E, V1> payloadFunc, @Nullable A receiverArg, @Nullable Subscriber<R> resultSubscriber, + @Nullable DataStreamerOptions options) { throw new UnsupportedOperationException(); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java index 437053fdf77..958c48d3a11 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java @@ -54,8 +54,8 @@ import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.KeyValueView; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -576,14 +576,14 @@ public class KeyValueBinaryViewImpl extends AbstractTableView<Entry<Tuple, Tuple } @Override - public <E, V, R, A> CompletableFuture<Void> streamData( + public <E, V, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, Entry<Tuple, Tuple>> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - @Nullable A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java index 9db6fd2f984..8648a08819c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java @@ -64,8 +64,8 @@ import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.KeyValueView; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -816,14 +816,14 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView<Entry<K, V>> imple } @Override - public <E, V1, R, A> CompletableFuture<Void> streamData( + public <E, V1, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V1, A, R> receiver, Function<E, Entry<K, V>> keyFunc, Function<E, V1> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - @Nullable A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java b/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java index f4cf5e01a47..165ffa8813e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java @@ -28,8 +28,8 @@ import org.apache.ignite.lang.AsyncCursor; import org.apache.ignite.lang.Cursor; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.DataStreamerTarget; -import org.apache.ignite.table.ReceiverDescriptor; import org.apache.ignite.table.criteria.Criteria; import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.table.criteria.CriteriaQuerySource; @@ -58,22 +58,22 @@ abstract class PublicApiThreadingViewBase<T> implements DataStreamerTarget<T>, C } @Override - public <E, V, R, A> CompletableFuture<Void> streamData( + public <E, V, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, T> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - A receiverArg) { + @Nullable DataStreamerOptions options) { return executeAsyncOp(() -> streamerTarget.streamData( publisher, + receiver, keyFunc, payloadFunc, - receiver, + receiverArg, resultSubscriber, - options, - receiverArg)); + options)); } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java index befa290a258..fdbbe6162a3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java @@ -48,7 +48,7 @@ import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; @@ -575,14 +575,14 @@ public class RecordBinaryViewImpl extends AbstractTableView<Tuple> implements Re } @Override - public <E, V, R, A> CompletableFuture<Void> streamData( + public <E, V, A, R> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, Tuple> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, - @Nullable DataStreamerOptions options, - @Nullable A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java index 6c8a827bd0d..5045b9fa271 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java @@ -56,7 +56,7 @@ import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.DataStreamerItem; import org.apache.ignite.table.DataStreamerOptions; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; @@ -606,14 +606,14 @@ public class RecordViewImpl<R> extends AbstractTableView<R> implements RecordVie } @Override - public <E, V, R1, A> CompletableFuture<Void> streamData( + public <E, V, A, R1> CompletableFuture<Void> streamData( Publisher<E> publisher, + DataStreamerReceiverDescriptor<V, A, R1> receiver, Function<E, R> keyFunc, Function<E, V> payloadFunc, - ReceiverDescriptor<A> receiver, + @Nullable A receiverArg, @Nullable Flow.Subscriber<R1> resultSubscriber, - @Nullable DataStreamerOptions options, - @Nullable A receiverArg) { + @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); Objects.requireNonNull(keyFunc); Objects.requireNonNull(payloadFunc); @@ -625,7 +625,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> implements RecordVie .thenCompose(node -> tbl.streamerReceiverRunner().runReceiverAsync( receiver, receiverArg, rows, node, receiver.units()))); - CompletableFuture<Void> future = DataStreamer.<R, E, V, R1>streamData( + CompletableFuture<Void> future = DataStreamer.streamData( publisher, keyFunc, payloadFunc, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java b/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java index c15af2e0604..c07e01a43c5 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.ignite.deployment.DeploymentUnit; import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.table.ReceiverDescriptor; +import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.ReceiverExecutionOptions; import org.jetbrains.annotations.Nullable; @@ -44,7 +44,7 @@ public interface StreamerReceiverRunner { * @param <R> Result type. */ <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync( - ReceiverDescriptor<A> receiver, + DataStreamerReceiverDescriptor<I, A, R> receiver, @Nullable A receiverArg, Collection<I> items, ClusterNode node,