This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 669372fa419 IGNITE-26817 Refactor GridJobExecuteResponse (#12474)
669372fa419 is described below

commit 669372fa4198eb36527ac073bcf05e92d78cb78d
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Nov 4 21:39:56 2025 +0300

    IGNITE-26817 Refactor GridJobExecuteResponse (#12474)
---
 .../ignite/internal/GridJobExecuteResponse.java    | 307 ++++++++++-----------
 .../communication/GridIoMessageFactory.java        |   3 +-
 .../internal/processors/job/GridJobProcessor.java  |   6 +-
 .../internal/processors/job/GridJobWorker.java     |  70 +----
 .../processors/task/GridTaskProcessor.java         |   2 +-
 .../internal/processors/task/GridTaskWorker.java   |  34 +--
 6 files changed, 169 insertions(+), 253 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
index be468be29a1..0ede431ce19 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
@@ -17,17 +17,21 @@
 
 package org.apache.ignite.internal;
 
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -35,44 +39,52 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridJobExecuteResponse implements Message {
     /** */
+    @Order(0)
     private UUID nodeId;
 
     /** */
+    @Order(value = 1, method = "sessionId")
     private IgniteUuid sesId;
 
     /** */
+    @Order(2)
     private IgniteUuid jobId;
 
     /** */
-    private byte[] gridExBytes;
-
-    /** */
-    @GridDirectTransient
     private IgniteException gridEx;
 
-    /** */
-    private byte[] resBytes;
+    /**
+     * Serialization call holder for {@code gridEx}. Works with {@link 
#marshallUserData(Marshaller)}.
+     * Wraps also possible serialization error.
+     */
+    @Order(value = 3, method = "exceptionMessage")
+    private @Nullable ErrorMessage gridExMsg;
+
+    /** Job result serialization call holder. */
+    @Order(value = 4, method = "jobResultBytes")
+    private @Nullable byte[] resBytes;
 
     /** */
-    @GridDirectTransient
-    private Object res;
+    private @Nullable Object res;
 
     /** */
+    /** Job attributes serialization call holder. */
+    @Order(value = 5, method = "jobAttrubutesBytes")
     private byte[] jobAttrsBytes;
 
     /** */
-    @GridDirectTransient
     private Map<Object, Object> jobAttrs;
 
     /** */
+    @Order(value = 6, method = "cancelled")
     private boolean isCancelled;
 
     /** */
     @GridToStringExclude
-    @GridDirectTransient
     private IgniteException fakeEx;
 
-    /** */
+    /** Retry topology version. */
+    @Order(value = 7, method = "retryTopologyVersion")
     private AffinityTopologyVersion retry;
 
     /**
@@ -86,11 +98,8 @@ public class GridJobExecuteResponse implements Message {
      * @param nodeId Sender node ID.
      * @param sesId Task session ID
      * @param jobId Job ID.
-     * @param gridExBytes Serialized grid exception.
      * @param gridEx Grid exception.
-     * @param resBytes Serialized result.
      * @param res Result.
-     * @param jobAttrsBytes Serialized job attributes.
      * @param jobAttrs Job attributes.
      * @param isCancelled Whether job was cancelled or not.
      * @param retry Topology version for that partitions haven't been reserved 
on the affinity node.
@@ -98,11 +107,8 @@ public class GridJobExecuteResponse implements Message {
     public GridJobExecuteResponse(UUID nodeId,
         IgniteUuid sesId,
         IgniteUuid jobId,
-        byte[] gridExBytes,
-        IgniteException gridEx,
-        byte[] resBytes,
-        Object res,
-        byte[] jobAttrsBytes,
+        @Nullable IgniteException gridEx,
+        @Nullable Object res,
         Map<Object, Object> jobAttrs,
         boolean isCancelled,
         AffinityTopologyVersion retry
@@ -114,37 +120,49 @@ public class GridJobExecuteResponse implements Message {
         this.nodeId = nodeId;
         this.sesId = sesId;
         this.jobId = jobId;
-        this.gridExBytes = gridExBytes;
-        this.gridEx = gridEx;
-        this.resBytes = resBytes;
         this.res = res;
-        this.jobAttrsBytes = jobAttrsBytes;
         this.jobAttrs = jobAttrs;
         this.isCancelled = isCancelled;
         this.retry = retry;
+        this.gridEx = gridEx;
     }
 
     /**
      * @return Task session ID.
      */
-    public IgniteUuid getSessionId() {
+    public IgniteUuid sessionId() {
         return sesId;
     }
 
+    /** */
+    public void sessionId(IgniteUuid sesId) {
+        this.sesId = sesId;
+    }
+
     /**
      * @return Job ID.
      */
-    public IgniteUuid getJobId() {
+    public IgniteUuid jobId() {
         return jobId;
     }
 
+    /** */
+    public void jobId(IgniteUuid jobId) {
+        this.jobId = jobId;
+    }
+
     /**
      * @return Serialized job result.
      */
-    @Nullable public byte[] getJobResultBytes() {
+    @Nullable public byte[] jobResultBytes() {
         return resBytes;
     }
 
+    /** */
+    public void jobResultBytes(@Nullable byte[] resBytes) {
+        this.resBytes = resBytes;
+    }
+
     /**
      * @return Job result.
      */
@@ -152,27 +170,43 @@ public class GridJobExecuteResponse implements Message {
         return res;
     }
 
-    /**
-     * @return Serialized job exception.
-     */
-    @Nullable public byte[] getExceptionBytes() {
-        return gridExBytes;
-    }
-
     /**
      * @return Job exception.
      */
-    @Nullable public IgniteException getException() {
+    @Nullable public IgniteException exception() {
         return gridEx;
     }
 
+    /** */
+    public void exceptionMessage(@Nullable ErrorMessage gridExMsg) {
+        if (gridExMsg == null) {
+            gridEx = null;
+
+            return;
+        }
+
+        Throwable t = gridExMsg.error();
+
+        gridEx = t instanceof IgniteException ? (IgniteException)t : new 
IgniteException(t);
+    }
+
+    /** */
+    public @Nullable ErrorMessage exceptionMessage() {
+        return gridEx == null ? null : new ErrorMessage(gridEx);
+    }
+
     /**
      * @return Serialized job attributes.
      */
-    @Nullable public byte[] getJobAttributesBytes() {
+    @Nullable public byte[] jobAttrubutesBytes() {
         return jobAttrsBytes;
     }
 
+    /** */
+    public void jobAttrubutesBytes(@Nullable byte[] jobAttrsBytes) {
+        this.jobAttrsBytes = jobAttrsBytes;
+    }
+
     /**
      * @return Job attributes.
      */
@@ -183,17 +217,27 @@ public class GridJobExecuteResponse implements Message {
     /**
      * @return Job cancellation status.
      */
-    public boolean isCancelled() {
+    public boolean cancelled() {
         return isCancelled;
     }
 
+    /** */
+    public void cancelled(boolean cancelled) {
+        isCancelled = cancelled;
+    }
+
     /**
      * @return Sender node ID.
      */
-    public UUID getNodeId() {
+    public UUID nodeId() {
         return nodeId;
     }
 
+    /** */
+    public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
     /**
      * @return Fake exception.
      */
@@ -219,147 +263,82 @@ public class GridJobExecuteResponse implements Message {
      * @return Topology version for that specified partitions haven't been 
reserved
      *          on the affinity node.
      */
-    public AffinityTopologyVersion getRetryTopologyVersion() {
-        return retry != null ? retry : AffinityTopologyVersion.NONE;
+    public @Nullable AffinityTopologyVersion retryTopologyVersion() {
+        return retry;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
+    /** */
+    public void retryTopologyVersion(@Nullable AffinityTopologyVersion retry) {
+        this.retry = retry;
+    }
 
-            writer.onHeaderWritten();
+    /**
+     * Serializes user data to byte[] with provided marshaller.
+     * Erases non-marshalled data like {@link #getJobAttributes()} or {@link 
#getJobResult()}.
+     */
+    public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log) 
{
+        if (res != null) {
+            try {
+                resBytes = U.marshal(marsh, res);
+            }
+            catch (IgniteCheckedException e) {
+                resBytes = null;
+
+                String msg = "Failed to serialize job response [nodeId=" + 
nodeId +
+                    ", ses=" + sesId + ", jobId=" + jobId +
+                    ", resCls=" + (res == null ? null : res.getClass()) + ']';
+
+                wrapSerializationError(e, msg, log);
+            }
+
+            res = null;
         }
 
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(gridExBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeBoolean(isCancelled))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeByteArray(jobAttrsBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeIgniteUuid(jobId))
-                    return false;
+        if (!F.isEmpty(jobAttrs)) {
+            try {
+                jobAttrsBytes = U.marshal(marsh, jobAttrs);
+            }
+            catch (IgniteCheckedException e) {
+                jobAttrsBytes = null;
 
-                writer.incrementState();
+                String msg = "Failed to serialize job attributes [nodeId=" + 
nodeId +
+                    ", ses=" + sesId + ", jobId=" + jobId +
+                    ", attrs=" + jobAttrs + ']';
 
-            case 4:
-                if (!writer.writeUuid(nodeId))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeByteArray(resBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeAffinityTopologyVersion(retry))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeIgniteUuid(sesId))
-                    return false;
-
-                writer.incrementState();
+                wrapSerializationError(e, msg, log);
+            }
 
+            jobAttrs = null;
         }
-
-        return true;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                gridExBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                isCancelled = reader.readBoolean();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                jobAttrsBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 3:
-                jobId = reader.readIgniteUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                nodeId = reader.readUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                resBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                retry = reader.readAffinityTopologyVersion();
-
-                if (!reader.isLastRead())
-                    return false;
+    /**
+     * Deserializes user data from byte[] with provided marshaller and class 
loader.
+     * Erases marshalled data like {@link #jobAttrubutesBytes()} or {@link 
#jobResultBytes()}.
+     */
+    public void unmarshallUserData(Marshaller marshaller, ClassLoader clsLdr) 
throws IgniteCheckedException {
+        if (jobAttrsBytes != null) {
+            jobAttrs = U.unmarshal(marshaller, jobAttrsBytes, clsLdr);
 
-                reader.incrementState();
+            jobAttrsBytes = null;
+        }
 
-            case 7:
-                sesId = reader.readIgniteUuid();
+        if (resBytes != null) {
+            res = U.unmarshal(marshaller, resBytes, clsLdr);
 
-                if (!reader.isLastRead())
-                    return false;
+            resBytes = null;
+        }
+    }
 
-                reader.incrementState();
+    /** */
+    private void wrapSerializationError(IgniteCheckedException e, String msg, 
@Nullable IgniteLogger log) {
+        if (gridEx != null)
+            e.addSuppressed(gridEx);
 
-        }
+        gridEx = U.convertException(e);
 
-        return true;
+        if (log != null && (log.isDebugEnabled() || !X.hasCause(e, 
NodeStoppingException.class)))
+            U.error(log, msg, e);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 09064df08e8..cf07a3fcfb8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -76,6 +76,7 @@ import 
org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSeriali
 import 
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
 import 
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
 import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
+import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
 import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
 import 
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
 import 
org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer;
@@ -310,7 +311,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, 
HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer());
         factory.register((short)0, GridJobCancelRequest::new, new 
GridJobCancelRequestSerializer());
         factory.register((short)1, GridJobExecuteRequest::new);
-        factory.register((short)2, GridJobExecuteResponse::new);
+        factory.register((short)2, GridJobExecuteResponse::new, new 
GridJobExecuteResponseSerializer());
         factory.register((short)3, GridJobSiblingsRequest::new, new 
GridJobSiblingsRequestSerializer());
         factory.register((short)4, GridJobSiblingsResponse::new);
         factory.register((short)5, GridTaskCancelRequest::new, new 
GridTaskCancelRequestSerializer());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index bcbaaaf96f0..aa005edfd2e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1674,15 +1674,15 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
                 locNodeId,
                 req.getSessionId(),
                 req.getJobId(),
-                loc ? null : U.marshal(marsh, ex),
                 ex,
-                loc ? null : U.marshal(marsh, null),
                 null,
-                loc ? null : U.marshal(marsh, null),
                 null,
                 false,
                 null);
 
+            if (!loc)
+                jobRes.marshallUserData(marsh, log);
+
             if (req.isSessionFullSupport()) {
                 // Send response to designated job topic.
                 // Always go through communication to preserve order,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 84a36b30126..4fd7eff87e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.job;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -898,64 +897,10 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                     }
                     else {
                         try {
-                            byte[] resBytes = null;
-                            byte[] exBytes = null;
-                            byte[] attrBytes = null;
-
                             boolean loc = 
ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
 
                             Map<Object, Object> attrs = jobCtx.getAttributes();
 
-                            // Try to serialize response, and if exception - 
return to client.
-                            if (!loc) {
-                                try {
-                                    resBytes = U.marshal(marsh, res);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    resBytes = U.marshal(marsh, null);
-
-                                    if (ex != null)
-                                        ex.addSuppressed(e);
-                                    else
-                                        ex = U.convertException(e);
-
-                                    logError("Failed to serialize job response 
[nodeId=" + taskNode.id() +
-                                        ", ses=" + ses + ", jobId=" + 
ses.getJobId() + ", job=" + job +
-                                        ", resCls=" + (res == null ? null : 
res.getClass()) + ']', e);
-                                }
-
-                                try {
-                                    attrBytes = U.marshal(marsh, attrs);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    attrBytes = U.marshal(marsh, 
Collections.emptyMap());
-
-                                    if (ex != null)
-                                        ex.addSuppressed(e);
-                                    else
-                                        ex = U.convertException(e);
-
-                                    logError("Failed to serialize job 
attributes [nodeId=" + taskNode.id() +
-                                        ", ses=" + ses + ", jobId=" + 
ses.getJobId() + ", job=" + job +
-                                        ", attrs=" + attrs + ']', e);
-                                }
-
-                                try {
-                                    exBytes = U.marshal(marsh, ex);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    String msg = "Failed to serialize job 
exception [nodeId=" + taskNode.id() +
-                                        ", ses=" + ses + ", jobId=" + 
ses.getJobId() + ", job=" + job +
-                                        ", msg=\"" + e.getMessage() + "\"]";
-
-                                    ex = new IgniteException(msg);
-
-                                    logError(msg, e);
-
-                                    exBytes = U.marshal(marsh, ex);
-                                }
-                            }
-
                             if (ex != null) {
                                 status = FAILED;
 
@@ -980,15 +925,15 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                                 ctx.localNodeId(),
                                 ses.getId(),
                                 ses.getJobId(),
-                                exBytes,
-                                loc ? ex : null,
-                                resBytes,
-                                loc ? res : null,
-                                attrBytes,
-                                loc ? attrs : null,
+                                ex,
+                                res,
+                                attrs,
                                 isCancelled(),
                                 retry ? 
ctx.cache().context().exchange().readyAffinityVersion() : null);
 
+                            if (!loc)
+                                jobRes.marshallUserData(marsh, log);
+
                             long timeout = ses.getEndTime() - 
U.currentTimeMillis();
 
                             if (timeout <= 0)
@@ -1009,9 +954,10 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                             }
                             else if (ctx.localNodeId().equals(sndNode.id()))
                                 
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
-                            else
+                            else {
                                 // Send response to common topic as unordered 
message.
                                 ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, 
jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+                            }
                         }
                         catch (IgniteCheckedException e) {
                             // Log and invoke the master-leave callback.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 4e15b5c12ba..b6411017fc9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1057,7 +1057,7 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
         lock.readLock();
 
         try {
-            GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
+            GridTaskWorker<?, ?> task = tasks.get(msg.sessionId());
 
             if (stopping && !waiting) {
                 U.warn(log, "Received job execution response while stopping 
grid (will ignore): " + msg
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 486dcdf85c5..62e9944210e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -757,7 +757,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
                         return;
                     }
 
-                    jobRes = this.jobRes.get(res.getJobId());
+                    jobRes = this.jobRes.get(res.jobId());
 
                     if (jobRes == null) {
                         if (log.isDebugEnabled())
@@ -782,7 +782,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
                         continue;
                     }
 
-                    if (!jobRes.getNode().id().equals(res.getNodeId())) {
+                    if (!jobRes.getNode().id().equals(res.nodeId())) {
                         if (log.isDebugEnabled())
                             log.debug("Ignoring stale response as job was 
already resent to other node [res=" + res +
                                 ", jobRes=" + jobRes + ']');
@@ -821,29 +821,19 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
 
                     // We don't keep reference to job if results are not 
cached.
                     if (!resCache)
-                        this.jobRes.remove(res.getJobId());
+                        this.jobRes.remove(res.jobId());
                 }
 
                 if (res.getFakeException() != null)
                     jobRes.onResponse(null, res.getFakeException(), null, 
false);
                 else {
-                    ClassLoader clsLdr = dep.classLoader();
-
                     try {
-                        boolean loc = 
ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
-
-                        Object res0 = loc ? res.getJobResult() : 
U.unmarshal(marsh, res.getJobResultBytes(),
-                            U.resolveClassLoader(clsLdr, ctx.config()));
-
-                        IgniteException ex = loc ? res.getException() :
-                            U.<IgniteException>unmarshal(marsh, 
res.getExceptionBytes(),
-                                U.resolveClassLoader(clsLdr, ctx.config()));
+                        boolean loc = ctx.localNodeId().equals(res.nodeId()) 
&& !ctx.config().isMarshalLocalJobs();
 
-                        Map<Object, Object> attrs = loc ? 
res.getJobAttributes() :
-                            U.<Map<Object, Object>>unmarshal(marsh, 
res.getJobAttributesBytes(),
-                                U.resolveClassLoader(clsLdr, ctx.config()));
+                        if (!loc)
+                            res.unmarshallUserData(marsh, 
U.resolveClassLoader(dep.classLoader(), ctx.config()));
 
-                        jobRes.onResponse(res0, ex, attrs, res.isCancelled());
+                        jobRes.onResponse(res.getJobResult(), res.exception(), 
res.getJobAttributes(), res.cancelled());
 
                         if (loc)
                             ctx.resource().invokeAnnotated(dep, 
jobRes.getJob(), ComputeJobAfterSend.class);
@@ -894,7 +884,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
                         assert affCacheIds != null;
                         retry = true;
 
-                        mapTopVer = U.max(res.getRetryTopologyVersion(), 
ctx.cache().context().exchange().readyAffinityVersion());
+                        mapTopVer = U.max(res.retryTopologyVersion(), 
ctx.cache().context().exchange().readyAffinityVersion());
                         affFut = 
ctx.cache().context().exchange().lastTopologyFuture();
 
                         if (affFut != null && !affFut.isDone()) {
@@ -1270,7 +1260,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
 
             if (!resCache) {
                 // Store result back in map before sending.
-                this.jobRes.put(res.getJobId(), jobRes);
+                this.jobRes.put(res.jobId(), jobRes);
             }
         }
 
@@ -1378,7 +1368,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
                 ctx.resource().invokeAnnotated(dep, res.getJob(), 
ComputeJobAfterSend.class);
 
                 GridJobExecuteResponse fakeRes = new 
GridJobExecuteResponse(node.id(), ses.getId(),
-                    res.getJobContext().getJobId(), null, null, null, null, 
null, null, false, null);
+                    res.getJobContext().getJobId(), null, null, null, false, 
null);
 
                 fakeRes.setFakeException(new ClusterTopologyException("Failed 
to send job due to node failure: " + node));
 
@@ -1491,7 +1481,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
             }
 
             GridJobExecuteResponse fakeRes = new 
GridJobExecuteResponse(node.id(), ses.getId(),
-                res.getJobContext().getJobId(), null, null, null, null, null, 
null, false, null);
+                res.getJobContext().getJobId(), null, null, null, false, null);
 
             if (fakeErr == null)
                 fakeErr = U.convertException(e);
@@ -1523,7 +1513,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
                         // Artificial response in case if a job is waiting for 
a response from
                         // non-existent node.
                         GridJobExecuteResponse fakeRes = new 
GridJobExecuteResponse(nodeId, ses.getId(),
-                            jr.getJobContext().getJobId(), null, null, null, 
null, null, null, false, null);
+                            jr.getJobContext().getJobId(), null, null, null, 
false, null);
 
                         fakeRes.setFakeException(new 
ClusterTopologyException("Node has left grid: " + nodeId));
 

Reply via email to