This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 04b98e45f68 IGNITE-27270 Use MessageSerializer for
DataStreamerResponse (#12558)
04b98e45f68 is described below
commit 04b98e45f6838bf9e63d1091f8ed3a70b0be0843
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Feb 19 22:02:31 2026 +0500
IGNITE-27270 Use MessageSerializer for DataStreamerResponse (#12558)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../datastreamer/DataStreamProcessor.java | 28 ++---
.../processors/datastreamer/DataStreamerImpl.java | 6 +-
.../datastreamer/DataStreamerResponse.java | 132 +++++++++------------
4 files changed, 68 insertions(+), 101 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 07592169e9c..69b38116fd9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer;
import
org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer;
+import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
import org.apache.ignite.internal.codegen.ExchangeInfoSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -420,7 +421,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)59, GridCacheQueryResponse::new, new
GridCacheQueryResponseSerializer());
factory.register((short)61, GridContinuousMessage::new);
factory.register((short)62, DataStreamerRequest::new);
- factory.register((short)63, DataStreamerResponse::new);
+ factory.register((short)63, DataStreamerResponse::new, new
DataStreamerResponseSerializer());
factory.register((short)76, GridTaskResultRequest::new, new
GridTaskResultRequestSerializer());
factory.register((short)77, GridTaskResultResponse::new, new
GridTaskResultResponseSerializer());
factory.register((short)78, MissingMappingRequestMessage::new, new
MissingMappingRequestMessageSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 65f225b22a9..f00624a4e6a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -270,8 +270,7 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
topic,
req.requestId(),
new IgniteCheckedException("Failed to get deployment
for request [sndId=" + nodeId +
- ", req=" + req + ']'),
- false);
+ ", req=" + req + ']'));
return;
}
@@ -290,7 +289,7 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal message [nodeId=" + nodeId +
", req=" + req + ']', e);
- sendResponse(nodeId, topic, req.requestId(), e, false);
+ sendResponse(nodeId, topic, req.requestId(), e);
return;
}
@@ -356,7 +355,7 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
}
if (remapErr != null) {
- sendResponse(nodeId, topic, req.requestId(), remapErr,
req.forceLocalDeployment());
+ sendResponse(nodeId, topic, req.requestId(), remapErr);
return;
}
@@ -392,7 +391,7 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
try {
job.call();
- sendResponse(nodeId, topic, req.requestId(), null,
req.forceLocalDeployment());
+ sendResponse(nodeId, topic, req.requestId(), null);
}
finally {
if (waitFut != null)
@@ -400,7 +399,7 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
}
}
catch (Throwable e) {
- sendResponse(nodeId, topic, req.requestId(), e,
req.forceLocalDeployment());
+ sendResponse(nodeId, topic, req.requestId(), e);
if (e instanceof Error)
throw (Error)e;
@@ -412,22 +411,11 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
* @param resTopic Response topic.
* @param reqId Request ID.
* @param err Error.
- * @param forceLocDep Force local deployment.
*/
- private void sendResponse(UUID nodeId, Object resTopic, long reqId,
@Nullable Throwable err,
- boolean forceLocDep) {
- byte[] errBytes;
+ private void sendResponse(UUID nodeId, Object resTopic, long reqId,
@Nullable Throwable err) {
+ DataStreamerResponse res = new DataStreamerResponse(reqId, err);
- try {
- errBytes = err != null ? U.marshal(marsh, err) : null;
- }
- catch (Exception e) {
- U.error(log, "Failed to marshal error [err=" + err + ", marshErr="
+ e + ']', e);
-
- errBytes = marshErrBytes;
- }
-
- DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes,
forceLocDep);
+ res.prepareMarshal(marsh, log, marshErrBytes);
try {
ctx.io().sendToCustomTopic(nodeId, resTopic, res,
threadIoPolicy());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index f154866fc98..3eda0cb9707 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2099,11 +2099,11 @@ public class DataStreamerImpl<K, V> implements
IgniteDataStreamer<K, V>, Delayed
try {
GridPeerDeployAware jobPda0 = jobPda;
- final Throwable cause = U.unmarshal(
- ctx,
- errBytes,
+ res.finishUnmarshal(ctx.marshaller(),
U.resolveClassLoader(jobPda0 != null ?
jobPda0.classLoader() : null, ctx.config()));
+ final Throwable cause = res.error();
+
final String msg = "DataStreamer request failed [node=" +
nodeId + "]";
if (cause instanceof ClusterTopologyCheckedException)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
index 20ac080bca0..83b51fb0800 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
@@ -17,34 +17,37 @@
package org.apache.ignite.internal.processors.datastreamer;
-import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
public class DataStreamerResponse implements Message {
/** */
+ @Order(value = 0, method = "requestId")
private long reqId;
/** */
- private byte[] errBytes;
+ private @Nullable Throwable err;
/** */
- private boolean forceLocDep;
+ @Order(value = 1, method = "errorBytes")
+ private @Nullable byte[] errBytes;
/**
* @param reqId Request ID.
- * @param errBytes Error bytes.
- * @param forceLocDep Force local deployment.
+ * @param err Error.
*/
- public DataStreamerResponse(long reqId, byte[] errBytes, boolean
forceLocDep) {
+ public DataStreamerResponse(long reqId, @Nullable Throwable err) {
this.reqId = reqId;
- this.errBytes = errBytes;
- this.forceLocDep = forceLocDep;
+ this.err = err;
}
/**
@@ -61,92 +64,67 @@ public class DataStreamerResponse implements Message {
return reqId;
}
+ /**
+ * @param reqId Request ID.
+ */
+ public void requestId(long reqId) {
+ this.reqId = reqId;
+ }
+
/**
* @return Error bytes.
*/
- public byte[] errorBytes() {
+ public @Nullable byte[] errorBytes() {
return errBytes;
}
/**
- * @return {@code True} to force local deployment.
+ * @param errBytes Error bytes.
*/
- public boolean forceLocalDeployment() {
- return forceLocDep;
+ public void errorBytes(@Nullable byte[] errBytes) {
+ this.errBytes = errBytes;
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DataStreamerResponse.class, this);
+ /**
+ * @return Error.
+ */
+ public Throwable error() {
+ return err;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
+ /**
+ * @param marsh Marshaller.
+ * @param log Logger.
+ * @param marshErrBytes Marshalled error bytes.
+ */
+ public void prepareMarshal(Marshaller marsh, IgniteLogger log, byte[]
marshErrBytes) {
+ if (err != null && errBytes == null) {
+ try {
+ errBytes = U.marshal(marsh, err);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal error [err=" + err + ",
marshErr=" + e + ']', e);
+
+ errBytes = marshErrBytes;
+ }
}
+ }
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeBoolean(forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong(reqId))
- return false;
-
- writer.incrementState();
+ /**
+ * @param marsh Marshaller.
+ * @param ldr Class loader.
+ */
+ public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws
IgniteCheckedException {
+ if (errBytes != null && err == null) {
+ err = U.unmarshal(marsh, errBytes, ldr);
+ errBytes = null;
}
-
- return true;
}
/** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- forceLocDep = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- reqId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ @Override public String toString() {
+ return S.toString(DataStreamerResponse.class, this);
}
/** {@inheritDoc} */