This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 669372fa419 IGNITE-26817 Refactor GridJobExecuteResponse (#12474)
669372fa419 is described below
commit 669372fa4198eb36527ac073bcf05e92d78cb78d
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Nov 4 21:39:56 2025 +0300
IGNITE-26817 Refactor GridJobExecuteResponse (#12474)
---
.../ignite/internal/GridJobExecuteResponse.java | 307 ++++++++++-----------
.../communication/GridIoMessageFactory.java | 3 +-
.../internal/processors/job/GridJobProcessor.java | 6 +-
.../internal/processors/job/GridJobWorker.java | 70 +----
.../processors/task/GridTaskProcessor.java | 2 +-
.../internal/processors/task/GridTaskWorker.java | 34 +--
6 files changed, 169 insertions(+), 253 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
index be468be29a1..0ede431ce19 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
@@ -17,17 +17,21 @@
package org.apache.ignite.internal;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -35,44 +39,52 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridJobExecuteResponse implements Message {
/** */
+ @Order(0)
private UUID nodeId;
/** */
+ @Order(value = 1, method = "sessionId")
private IgniteUuid sesId;
/** */
+ @Order(2)
private IgniteUuid jobId;
/** */
- private byte[] gridExBytes;
-
- /** */
- @GridDirectTransient
private IgniteException gridEx;
- /** */
- private byte[] resBytes;
+ /**
+ * Serialization call holder for {@code gridEx}. Works with {@link
#marshallUserData(Marshaller)}.
+ * Wraps also possible serialization error.
+ */
+ @Order(value = 3, method = "exceptionMessage")
+ private @Nullable ErrorMessage gridExMsg;
+
+ /** Job result serialization call holder. */
+ @Order(value = 4, method = "jobResultBytes")
+ private @Nullable byte[] resBytes;
/** */
- @GridDirectTransient
- private Object res;
+ private @Nullable Object res;
/** */
+ /** Job attributes serialization call holder. */
+ @Order(value = 5, method = "jobAttrubutesBytes")
private byte[] jobAttrsBytes;
/** */
- @GridDirectTransient
private Map<Object, Object> jobAttrs;
/** */
+ @Order(value = 6, method = "cancelled")
private boolean isCancelled;
/** */
@GridToStringExclude
- @GridDirectTransient
private IgniteException fakeEx;
- /** */
+ /** Retry topology version. */
+ @Order(value = 7, method = "retryTopologyVersion")
private AffinityTopologyVersion retry;
/**
@@ -86,11 +98,8 @@ public class GridJobExecuteResponse implements Message {
* @param nodeId Sender node ID.
* @param sesId Task session ID
* @param jobId Job ID.
- * @param gridExBytes Serialized grid exception.
* @param gridEx Grid exception.
- * @param resBytes Serialized result.
* @param res Result.
- * @param jobAttrsBytes Serialized job attributes.
* @param jobAttrs Job attributes.
* @param isCancelled Whether job was cancelled or not.
* @param retry Topology version for that partitions haven't been reserved
on the affinity node.
@@ -98,11 +107,8 @@ public class GridJobExecuteResponse implements Message {
public GridJobExecuteResponse(UUID nodeId,
IgniteUuid sesId,
IgniteUuid jobId,
- byte[] gridExBytes,
- IgniteException gridEx,
- byte[] resBytes,
- Object res,
- byte[] jobAttrsBytes,
+ @Nullable IgniteException gridEx,
+ @Nullable Object res,
Map<Object, Object> jobAttrs,
boolean isCancelled,
AffinityTopologyVersion retry
@@ -114,37 +120,49 @@ public class GridJobExecuteResponse implements Message {
this.nodeId = nodeId;
this.sesId = sesId;
this.jobId = jobId;
- this.gridExBytes = gridExBytes;
- this.gridEx = gridEx;
- this.resBytes = resBytes;
this.res = res;
- this.jobAttrsBytes = jobAttrsBytes;
this.jobAttrs = jobAttrs;
this.isCancelled = isCancelled;
this.retry = retry;
+ this.gridEx = gridEx;
}
/**
* @return Task session ID.
*/
- public IgniteUuid getSessionId() {
+ public IgniteUuid sessionId() {
return sesId;
}
+ /** */
+ public void sessionId(IgniteUuid sesId) {
+ this.sesId = sesId;
+ }
+
/**
* @return Job ID.
*/
- public IgniteUuid getJobId() {
+ public IgniteUuid jobId() {
return jobId;
}
+ /** */
+ public void jobId(IgniteUuid jobId) {
+ this.jobId = jobId;
+ }
+
/**
* @return Serialized job result.
*/
- @Nullable public byte[] getJobResultBytes() {
+ @Nullable public byte[] jobResultBytes() {
return resBytes;
}
+ /** */
+ public void jobResultBytes(@Nullable byte[] resBytes) {
+ this.resBytes = resBytes;
+ }
+
/**
* @return Job result.
*/
@@ -152,27 +170,43 @@ public class GridJobExecuteResponse implements Message {
return res;
}
- /**
- * @return Serialized job exception.
- */
- @Nullable public byte[] getExceptionBytes() {
- return gridExBytes;
- }
-
/**
* @return Job exception.
*/
- @Nullable public IgniteException getException() {
+ @Nullable public IgniteException exception() {
return gridEx;
}
+ /** */
+ public void exceptionMessage(@Nullable ErrorMessage gridExMsg) {
+ if (gridExMsg == null) {
+ gridEx = null;
+
+ return;
+ }
+
+ Throwable t = gridExMsg.error();
+
+ gridEx = t instanceof IgniteException ? (IgniteException)t : new
IgniteException(t);
+ }
+
+ /** */
+ public @Nullable ErrorMessage exceptionMessage() {
+ return gridEx == null ? null : new ErrorMessage(gridEx);
+ }
+
/**
* @return Serialized job attributes.
*/
- @Nullable public byte[] getJobAttributesBytes() {
+ @Nullable public byte[] jobAttrubutesBytes() {
return jobAttrsBytes;
}
+ /** */
+ public void jobAttrubutesBytes(@Nullable byte[] jobAttrsBytes) {
+ this.jobAttrsBytes = jobAttrsBytes;
+ }
+
/**
* @return Job attributes.
*/
@@ -183,17 +217,27 @@ public class GridJobExecuteResponse implements Message {
/**
* @return Job cancellation status.
*/
- public boolean isCancelled() {
+ public boolean cancelled() {
return isCancelled;
}
+ /** */
+ public void cancelled(boolean cancelled) {
+ isCancelled = cancelled;
+ }
+
/**
* @return Sender node ID.
*/
- public UUID getNodeId() {
+ public UUID nodeId() {
return nodeId;
}
+ /** */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
/**
* @return Fake exception.
*/
@@ -219,147 +263,82 @@ public class GridJobExecuteResponse implements Message {
* @return Topology version for that specified partitions haven't been
reserved
* on the affinity node.
*/
- public AffinityTopologyVersion getRetryTopologyVersion() {
- return retry != null ? retry : AffinityTopologyVersion.NONE;
+ public @Nullable AffinityTopologyVersion retryTopologyVersion() {
+ return retry;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
+ /** */
+ public void retryTopologyVersion(@Nullable AffinityTopologyVersion retry) {
+ this.retry = retry;
+ }
- writer.onHeaderWritten();
+ /**
+ * Serializes user data to byte[] with provided marshaller.
+ * Erases non-marshalled data like {@link #getJobAttributes()} or {@link
#getJobResult()}.
+ */
+ public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log)
{
+ if (res != null) {
+ try {
+ resBytes = U.marshal(marsh, res);
+ }
+ catch (IgniteCheckedException e) {
+ resBytes = null;
+
+ String msg = "Failed to serialize job response [nodeId=" +
nodeId +
+ ", ses=" + sesId + ", jobId=" + jobId +
+ ", resCls=" + (res == null ? null : res.getClass()) + ']';
+
+ wrapSerializationError(e, msg, log);
+ }
+
+ res = null;
}
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(gridExBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeBoolean(isCancelled))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByteArray(jobAttrsBytes))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeIgniteUuid(jobId))
- return false;
+ if (!F.isEmpty(jobAttrs)) {
+ try {
+ jobAttrsBytes = U.marshal(marsh, jobAttrs);
+ }
+ catch (IgniteCheckedException e) {
+ jobAttrsBytes = null;
- writer.incrementState();
+ String msg = "Failed to serialize job attributes [nodeId=" +
nodeId +
+ ", ses=" + sesId + ", jobId=" + jobId +
+ ", attrs=" + jobAttrs + ']';
- case 4:
- if (!writer.writeUuid(nodeId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeByteArray(resBytes))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeAffinityTopologyVersion(retry))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeIgniteUuid(sesId))
- return false;
-
- writer.incrementState();
+ wrapSerializationError(e, msg, log);
+ }
+ jobAttrs = null;
}
-
- return true;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- gridExBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- isCancelled = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- jobAttrsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- jobId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- nodeId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- resBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- retry = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
+ /**
+ * Deserializes user data from byte[] with provided marshaller and class
loader.
+ * Erases marshalled data like {@link #jobAttrubutesBytes()} or {@link
#jobResultBytes()}.
+ */
+ public void unmarshallUserData(Marshaller marshaller, ClassLoader clsLdr)
throws IgniteCheckedException {
+ if (jobAttrsBytes != null) {
+ jobAttrs = U.unmarshal(marshaller, jobAttrsBytes, clsLdr);
- reader.incrementState();
+ jobAttrsBytes = null;
+ }
- case 7:
- sesId = reader.readIgniteUuid();
+ if (resBytes != null) {
+ res = U.unmarshal(marshaller, resBytes, clsLdr);
- if (!reader.isLastRead())
- return false;
+ resBytes = null;
+ }
+ }
- reader.incrementState();
+ /** */
+ private void wrapSerializationError(IgniteCheckedException e, String msg,
@Nullable IgniteLogger log) {
+ if (gridEx != null)
+ e.addSuppressed(gridEx);
- }
+ gridEx = U.convertException(e);
- return true;
+ if (log != null && (log.isDebugEnabled() || !X.hasCause(e,
NodeStoppingException.class)))
+ U.error(log, msg, e);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 09064df08e8..cf07a3fcfb8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -76,6 +76,7 @@ import
org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSeriali
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
+import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer;
@@ -310,7 +311,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE,
HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer());
factory.register((short)0, GridJobCancelRequest::new, new
GridJobCancelRequestSerializer());
factory.register((short)1, GridJobExecuteRequest::new);
- factory.register((short)2, GridJobExecuteResponse::new);
+ factory.register((short)2, GridJobExecuteResponse::new, new
GridJobExecuteResponseSerializer());
factory.register((short)3, GridJobSiblingsRequest::new, new
GridJobSiblingsRequestSerializer());
factory.register((short)4, GridJobSiblingsResponse::new);
factory.register((short)5, GridTaskCancelRequest::new, new
GridTaskCancelRequestSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index bcbaaaf96f0..aa005edfd2e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1674,15 +1674,15 @@ public class GridJobProcessor extends
GridProcessorAdapter {
locNodeId,
req.getSessionId(),
req.getJobId(),
- loc ? null : U.marshal(marsh, ex),
ex,
- loc ? null : U.marshal(marsh, null),
null,
- loc ? null : U.marshal(marsh, null),
null,
false,
null);
+ if (!loc)
+ jobRes.marshallUserData(marsh, log);
+
if (req.isSessionFullSupport()) {
// Send response to designated job topic.
// Always go through communication to preserve order,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 84a36b30126..4fd7eff87e9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.job;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -898,64 +897,10 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
}
else {
try {
- byte[] resBytes = null;
- byte[] exBytes = null;
- byte[] attrBytes = null;
-
boolean loc =
ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
Map<Object, Object> attrs = jobCtx.getAttributes();
- // Try to serialize response, and if exception -
return to client.
- if (!loc) {
- try {
- resBytes = U.marshal(marsh, res);
- }
- catch (IgniteCheckedException e) {
- resBytes = U.marshal(marsh, null);
-
- if (ex != null)
- ex.addSuppressed(e);
- else
- ex = U.convertException(e);
-
- logError("Failed to serialize job response
[nodeId=" + taskNode.id() +
- ", ses=" + ses + ", jobId=" +
ses.getJobId() + ", job=" + job +
- ", resCls=" + (res == null ? null :
res.getClass()) + ']', e);
- }
-
- try {
- attrBytes = U.marshal(marsh, attrs);
- }
- catch (IgniteCheckedException e) {
- attrBytes = U.marshal(marsh,
Collections.emptyMap());
-
- if (ex != null)
- ex.addSuppressed(e);
- else
- ex = U.convertException(e);
-
- logError("Failed to serialize job
attributes [nodeId=" + taskNode.id() +
- ", ses=" + ses + ", jobId=" +
ses.getJobId() + ", job=" + job +
- ", attrs=" + attrs + ']', e);
- }
-
- try {
- exBytes = U.marshal(marsh, ex);
- }
- catch (IgniteCheckedException e) {
- String msg = "Failed to serialize job
exception [nodeId=" + taskNode.id() +
- ", ses=" + ses + ", jobId=" +
ses.getJobId() + ", job=" + job +
- ", msg=\"" + e.getMessage() + "\"]";
-
- ex = new IgniteException(msg);
-
- logError(msg, e);
-
- exBytes = U.marshal(marsh, ex);
- }
- }
-
if (ex != null) {
status = FAILED;
@@ -980,15 +925,15 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
ctx.localNodeId(),
ses.getId(),
ses.getJobId(),
- exBytes,
- loc ? ex : null,
- resBytes,
- loc ? res : null,
- attrBytes,
- loc ? attrs : null,
+ ex,
+ res,
+ attrs,
isCancelled(),
retry ?
ctx.cache().context().exchange().readyAffinityVersion() : null);
+ if (!loc)
+ jobRes.marshallUserData(marsh, log);
+
long timeout = ses.getEndTime() -
U.currentTimeMillis();
if (timeout <= 0)
@@ -1009,9 +954,10 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
}
else if (ctx.localNodeId().equals(sndNode.id()))
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
- else
+ else {
// Send response to common topic as unordered
message.
ctx.io().sendToGridTopic(sndNode, TOPIC_TASK,
jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+ }
}
catch (IgniteCheckedException e) {
// Log and invoke the master-leave callback.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 4e15b5c12ba..b6411017fc9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1057,7 +1057,7 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
lock.readLock();
try {
- GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
+ GridTaskWorker<?, ?> task = tasks.get(msg.sessionId());
if (stopping && !waiting) {
U.warn(log, "Received job execution response while stopping
grid (will ignore): " + msg
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 486dcdf85c5..62e9944210e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -757,7 +757,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
return;
}
- jobRes = this.jobRes.get(res.getJobId());
+ jobRes = this.jobRes.get(res.jobId());
if (jobRes == null) {
if (log.isDebugEnabled())
@@ -782,7 +782,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
continue;
}
- if (!jobRes.getNode().id().equals(res.getNodeId())) {
+ if (!jobRes.getNode().id().equals(res.nodeId())) {
if (log.isDebugEnabled())
log.debug("Ignoring stale response as job was
already resent to other node [res=" + res +
", jobRes=" + jobRes + ']');
@@ -821,29 +821,19 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
// We don't keep reference to job if results are not
cached.
if (!resCache)
- this.jobRes.remove(res.getJobId());
+ this.jobRes.remove(res.jobId());
}
if (res.getFakeException() != null)
jobRes.onResponse(null, res.getFakeException(), null,
false);
else {
- ClassLoader clsLdr = dep.classLoader();
-
try {
- boolean loc =
ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
-
- Object res0 = loc ? res.getJobResult() :
U.unmarshal(marsh, res.getJobResultBytes(),
- U.resolveClassLoader(clsLdr, ctx.config()));
-
- IgniteException ex = loc ? res.getException() :
- U.<IgniteException>unmarshal(marsh,
res.getExceptionBytes(),
- U.resolveClassLoader(clsLdr, ctx.config()));
+ boolean loc = ctx.localNodeId().equals(res.nodeId())
&& !ctx.config().isMarshalLocalJobs();
- Map<Object, Object> attrs = loc ?
res.getJobAttributes() :
- U.<Map<Object, Object>>unmarshal(marsh,
res.getJobAttributesBytes(),
- U.resolveClassLoader(clsLdr, ctx.config()));
+ if (!loc)
+ res.unmarshallUserData(marsh,
U.resolveClassLoader(dep.classLoader(), ctx.config()));
- jobRes.onResponse(res0, ex, attrs, res.isCancelled());
+ jobRes.onResponse(res.getJobResult(), res.exception(),
res.getJobAttributes(), res.cancelled());
if (loc)
ctx.resource().invokeAnnotated(dep,
jobRes.getJob(), ComputeJobAfterSend.class);
@@ -894,7 +884,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
assert affCacheIds != null;
retry = true;
- mapTopVer = U.max(res.getRetryTopologyVersion(),
ctx.cache().context().exchange().readyAffinityVersion());
+ mapTopVer = U.max(res.retryTopologyVersion(),
ctx.cache().context().exchange().readyAffinityVersion());
affFut =
ctx.cache().context().exchange().lastTopologyFuture();
if (affFut != null && !affFut.isDone()) {
@@ -1270,7 +1260,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
if (!resCache) {
// Store result back in map before sending.
- this.jobRes.put(res.getJobId(), jobRes);
+ this.jobRes.put(res.jobId(), jobRes);
}
}
@@ -1378,7 +1368,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
ctx.resource().invokeAnnotated(dep, res.getJob(),
ComputeJobAfterSend.class);
GridJobExecuteResponse fakeRes = new
GridJobExecuteResponse(node.id(), ses.getId(),
- res.getJobContext().getJobId(), null, null, null, null,
null, null, false, null);
+ res.getJobContext().getJobId(), null, null, null, false,
null);
fakeRes.setFakeException(new ClusterTopologyException("Failed
to send job due to node failure: " + node));
@@ -1491,7 +1481,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
}
GridJobExecuteResponse fakeRes = new
GridJobExecuteResponse(node.id(), ses.getId(),
- res.getJobContext().getJobId(), null, null, null, null, null,
null, false, null);
+ res.getJobContext().getJobId(), null, null, null, false, null);
if (fakeErr == null)
fakeErr = U.convertException(e);
@@ -1523,7 +1513,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
// Artificial response in case if a job is waiting for
a response from
// non-existent node.
GridJobExecuteResponse fakeRes = new
GridJobExecuteResponse(nodeId, ses.getId(),
- jr.getJobContext().getJobId(), null, null, null,
null, null, null, false, null);
+ jr.getJobContext().getJobId(), null, null, null,
false, null);
fakeRes.setFakeException(new
ClusterTopologyException("Node has left grid: " + nodeId));