This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 04b98e45f68 IGNITE-27270 Use MessageSerializer for 
DataStreamerResponse (#12558)
04b98e45f68 is described below

commit 04b98e45f6838bf9e63d1091f8ed3a70b0be0843
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Feb 19 22:02:31 2026 +0500

    IGNITE-27270 Use MessageSerializer for DataStreamerResponse (#12558)
---
 .../communication/GridIoMessageFactory.java        |   3 +-
 .../datastreamer/DataStreamProcessor.java          |  28 ++---
 .../processors/datastreamer/DataStreamerImpl.java  |   6 +-
 .../datastreamer/DataStreamerResponse.java         | 132 +++++++++------------
 4 files changed, 68 insertions(+), 101 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 07592169e9c..69b38116fd9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
 import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
 import 
org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer;
 import 
org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer;
+import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer;
 import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
 import org.apache.ignite.internal.codegen.ExchangeInfoSerializer;
 import 
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -420,7 +421,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)59, GridCacheQueryResponse::new, new 
GridCacheQueryResponseSerializer());
         factory.register((short)61, GridContinuousMessage::new);
         factory.register((short)62, DataStreamerRequest::new);
-        factory.register((short)63, DataStreamerResponse::new);
+        factory.register((short)63, DataStreamerResponse::new, new 
DataStreamerResponseSerializer());
         factory.register((short)76, GridTaskResultRequest::new, new 
GridTaskResultRequestSerializer());
         factory.register((short)77, GridTaskResultResponse::new, new 
GridTaskResultResponseSerializer());
         factory.register((short)78, MissingMappingRequestMessage::new, new 
MissingMappingRequestMessageSerializer());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 65f225b22a9..f00624a4e6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -270,8 +270,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                         topic,
                         req.requestId(),
                         new IgniteCheckedException("Failed to get deployment 
for request [sndId=" + nodeId +
-                            ", req=" + req + ']'),
-                        false);
+                            ", req=" + req + ']'));
 
                     return;
                 }
@@ -290,7 +289,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + 
", req=" + req + ']', e);
 
-                sendResponse(nodeId, topic, req.requestId(), e, false);
+                sendResponse(nodeId, topic, req.requestId(), e);
 
                 return;
             }
@@ -356,7 +355,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                 }
 
                 if (remapErr != null) {
-                    sendResponse(nodeId, topic, req.requestId(), remapErr, 
req.forceLocalDeployment());
+                    sendResponse(nodeId, topic, req.requestId(), remapErr);
 
                     return;
                 }
@@ -392,7 +391,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             try {
                 job.call();
 
-                sendResponse(nodeId, topic, req.requestId(), null, 
req.forceLocalDeployment());
+                sendResponse(nodeId, topic, req.requestId(), null);
             }
             finally {
                 if (waitFut != null)
@@ -400,7 +399,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             }
         }
         catch (Throwable e) {
-            sendResponse(nodeId, topic, req.requestId(), e, 
req.forceLocalDeployment());
+            sendResponse(nodeId, topic, req.requestId(), e);
 
             if (e instanceof Error)
                 throw (Error)e;
@@ -412,22 +411,11 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
      * @param resTopic Response topic.
      * @param reqId Request ID.
      * @param err Error.
-     * @param forceLocDep Force local deployment.
      */
-    private void sendResponse(UUID nodeId, Object resTopic, long reqId, 
@Nullable Throwable err,
-        boolean forceLocDep) {
-        byte[] errBytes;
+    private void sendResponse(UUID nodeId, Object resTopic, long reqId, 
@Nullable Throwable err) {
+        DataStreamerResponse res = new DataStreamerResponse(reqId, err);
 
-        try {
-            errBytes = err != null ? U.marshal(marsh, err) : null;
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" 
+ e + ']', e);
-
-            errBytes = marshErrBytes;
-        }
-
-        DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);
+        res.prepareMarshal(marsh, log, marshErrBytes);
 
         try {
             ctx.io().sendToCustomTopic(nodeId, resTopic, res, 
threadIoPolicy());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index f154866fc98..3eda0cb9707 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2099,11 +2099,11 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 try {
                     GridPeerDeployAware jobPda0 = jobPda;
 
-                    final Throwable cause = U.unmarshal(
-                        ctx,
-                        errBytes,
+                    res.finishUnmarshal(ctx.marshaller(),
                         U.resolveClassLoader(jobPda0 != null ? 
jobPda0.classLoader() : null, ctx.config()));
 
+                    final Throwable cause = res.error();
+
                     final String msg = "DataStreamer request failed [node=" + 
nodeId + "]";
 
                     if (cause instanceof ClusterTopologyCheckedException)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
index 20ac080bca0..83b51fb0800 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
@@ -17,34 +17,37 @@
 
 package org.apache.ignite.internal.processors.datastreamer;
 
-import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public class DataStreamerResponse implements Message {
     /** */
+    @Order(value = 0, method = "requestId")
     private long reqId;
 
     /** */
-    private byte[] errBytes;
+    private @Nullable Throwable err;
 
     /** */
-    private boolean forceLocDep;
+    @Order(value = 1, method = "errorBytes")
+    private @Nullable byte[] errBytes;
 
     /**
      * @param reqId Request ID.
-     * @param errBytes Error bytes.
-     * @param forceLocDep Force local deployment.
+     * @param err Error.
      */
-    public DataStreamerResponse(long reqId, byte[] errBytes, boolean 
forceLocDep) {
+    public DataStreamerResponse(long reqId, @Nullable Throwable err) {
         this.reqId = reqId;
-        this.errBytes = errBytes;
-        this.forceLocDep = forceLocDep;
+        this.err = err;
     }
 
     /**
@@ -61,92 +64,67 @@ public class DataStreamerResponse implements Message {
         return reqId;
     }
 
+    /**
+     * @param reqId Request ID.
+     */
+    public void requestId(long reqId) {
+        this.reqId = reqId;
+    }
+
     /**
      * @return Error bytes.
      */
-    public byte[] errorBytes() {
+    public @Nullable byte[] errorBytes() {
         return errBytes;
     }
 
     /**
-     * @return {@code True} to force local deployment.
+     * @param errBytes Error bytes.
      */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
+    public void errorBytes(@Nullable byte[] errBytes) {
+        this.errBytes = errBytes;
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(DataStreamerResponse.class, this);
+    /**
+     * @return Error.
+     */
+    public Throwable error() {
+        return err;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
+    /**
+     * @param marsh Marshaller.
+     * @param log Logger.
+     * @param marshErrBytes Marshalled error bytes.
+     */
+    public void prepareMarshal(Marshaller marsh, IgniteLogger log, byte[] 
marshErrBytes) {
+        if (err != null && errBytes == null) {
+            try {
+                errBytes = U.marshal(marsh, err);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to marshal error [err=" + err + ", 
marshErr=" + e + ']', e);
+
+                errBytes = marshErrBytes;
+            }
         }
+    }
 
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeBoolean(forceLocDep))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong(reqId))
-                    return false;
-
-                writer.incrementState();
+    /**
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     */
+    public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws 
IgniteCheckedException {
+        if (errBytes != null && err == null) {
+            err = U.unmarshal(marsh, errBytes, ldr);
 
+            errBytes = null;
         }
-
-        return true;
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                errBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                forceLocDep = reader.readBoolean();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                reqId = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
+    @Override public String toString() {
+        return S.toString(DataStreamerResponse.class, this);
     }
 
     /** {@inheritDoc} */

Reply via email to