This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a53436df7ab IGNITE-25910 Use MessageSerializer for
GridJobExecuteRequest (#12498)
a53436df7ab is described below
commit a53436df7ab7cf4cb5d8b58c68655178150c38fa
Author: Ilya Shishkov <[email protected]>
AuthorDate: Wed Nov 19 12:47:00 2025 +0300
IGNITE-25910 Use MessageSerializer for GridJobExecuteRequest (#12498)
---
.../ignite/internal/GridJobExecuteRequest.java | 729 ++++++++-------------
.../communication/DeploymentModeMessage.java | 113 ++++
.../communication/GridIoMessageFactory.java | 5 +-
.../internal/processors/job/GridJobProcessor.java | 151 ++---
.../internal/processors/job/GridJobWorker.java | 25 +-
.../internal/processors/task/GridTaskWorker.java | 67 +-
.../apache/ignite/marshaller/MarshallerUtils.java | 43 --
.../internal/client/thin/ServiceAwarenessTest.java | 2 +-
.../communication/DeploymentModeMessageTest.java | 89 +++
...CacheLongRunningTransactionDiagnosticsTest.java | 2 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 4 +-
11 files changed, 583 insertions(+), 647 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index 1ba24925810..27f3e6aa683 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -18,14 +18,15 @@
package org.apache.ignite.internal;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.internal.managers.communication.DeploymentModeMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -33,123 +34,137 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.IGNITE_UUID;
-
/**
* Job execution request.
*/
+@SuppressWarnings({"AssignmentOrReturnOfFieldWithMutableType",
"NullableProblems"})
public class GridJobExecuteRequest implements ExecutorAwareMessage {
/** */
+ @Order(value = 0, method = "sessionId")
private IgniteUuid sesId;
/** */
+ @Order(1)
private IgniteUuid jobId;
/** */
@GridToStringExclude
+ @Order(2)
private byte[] jobBytes;
/** */
@GridToStringExclude
- @GridDirectTransient
private ComputeJob job;
/** */
+ @Order(3)
private long startTaskTime;
/** */
+ @Order(4)
private long timeout;
/** */
+ @Order(5)
private String taskName;
/** */
+ @Order(value = 6, method = "userVersion")
private String userVer;
/** */
+ @Order(value = 7, method = "taskClassName")
private String taskClsName;
/** Node class loader participants. */
@GridToStringInclude
- @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+ @Order(value = 8, method = "loaderParticipants")
private Map<UUID, IgniteUuid> ldrParticipants;
/** */
@GridToStringExclude
+ @Order(value = 9, method = "sessionAttributesBytes")
private byte[] sesAttrsBytes;
/** */
@GridToStringExclude
- @GridDirectTransient
private Map<Object, Object> sesAttrs;
/** */
@GridToStringExclude
+ @Order(value = 10, method = "jobAttributesBytes")
private byte[] jobAttrsBytes;
/** */
@GridToStringExclude
- @GridDirectTransient
private Map<? extends Serializable, ? extends Serializable> jobAttrs;
/** Checkpoint SPI name. */
+ @Order(value = 11, method = "checkpointSpi")
private String cpSpi;
/** */
- @GridDirectTransient
private Collection<ComputeJobSibling> siblings;
/** */
+ @Order(12)
private byte[] siblingsBytes;
/** Transient since needs to hold local creation time. */
- @GridDirectTransient
- private long createTime = U.currentTimeMillis();
+ private final long createTime = U.currentTimeMillis();
/** */
+ @Order(value = 13, method = "classLoaderId")
private IgniteUuid clsLdrId;
/** */
- private DeploymentMode depMode;
+ @Order(value = 14, method = "deploymentModeMessage")
+ private DeploymentModeMessage depModeMsg;
/** */
+ @Order(15)
private boolean dynamicSiblings;
/** */
+ @Order(value = 16, method = "forceLocalDeployment")
private boolean forceLocDep;
/** */
+ @Order(value = 17, method = "sessionFullSupport")
private boolean sesFullSup;
/** */
+ @Order(18)
private boolean internal;
/** */
- @GridDirectCollection(UUID.class)
+ @Order(value = 19, method = "topology")
private Collection<UUID> top;
/** */
- @GridDirectTransient
private IgnitePredicate<ClusterNode> topPred;
/** */
+ @Order(value = 20, method = "topologyPredicateBytes")
private byte[] topPredBytes;
/** */
- private int[] idsOfCaches;
+ @Order(21)
+ private int[] cacheIds;
/** */
+ @Order(value = 22, method = "partition")
private int part;
/** */
+ @Order(value = 23, method = "topologyVersion")
private AffinityTopologyVersion topVer;
/** */
+ @Order(value = 24, method = "executorName")
private String execName;
/**
@@ -165,18 +180,13 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
* @param taskName Task name.
* @param userVer Code version.
* @param taskClsName Fully qualified task name.
- * @param jobBytes Job serialized body.
* @param job Job.
* @param startTaskTime Task execution start time.
* @param timeout Task execution timeout.
* @param top Topology.
* @param topPred Topology predicate.
- * @param topPredBytes Marshalled topology predicate.
- * @param siblingsBytes Serialized collection of split siblings.
* @param siblings Collection of split siblings.
- * @param sesAttrsBytes Map of session attributes.
* @param sesAttrs Session attributes.
- * @param jobAttrsBytes Job context attributes.
* @param jobAttrs Job attributes.
* @param cpSpi Collision SPI.
* @param clsLdrId Task local class loader id.
@@ -197,18 +207,13 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
String taskName,
String userVer,
String taskClsName,
- byte[] jobBytes,
ComputeJob job,
long startTaskTime,
long timeout,
@Nullable Collection<UUID> top,
@Nullable IgnitePredicate<ClusterNode> topPred,
- byte[] topPredBytes,
- byte[] siblingsBytes,
Collection<ComputeJobSibling> siblings,
- byte[] sesAttrsBytes,
Map<Object, Object> sesAttrs,
- byte[] jobAttrsBytes,
Map<? extends Serializable, ? extends Serializable> jobAttrs,
String cpSpi,
IgniteUuid clsLdrId,
@@ -226,10 +231,10 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
assert jobId != null;
assert taskName != null;
assert taskClsName != null;
- assert job != null || jobBytes != null;
- assert sesAttrs != null || sesAttrsBytes != null || !sesFullSup;
- assert jobAttrs != null || jobAttrsBytes != null;
- assert top != null || topPred != null || topPredBytes != null;
+ assert job != null;
+ assert sesAttrs != null || !sesFullSup;
+ assert jobAttrs != null;
+ assert top != null || topPred != null;
assert clsLdrId != null;
assert userVer != null;
assert depMode != null;
@@ -239,28 +244,23 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
this.taskName = taskName;
this.userVer = userVer;
this.taskClsName = taskClsName;
- this.jobBytes = jobBytes;
this.job = job;
this.startTaskTime = startTaskTime;
this.timeout = timeout;
this.top = top;
this.topVer = topVer;
this.topPred = topPred;
- this.topPredBytes = topPredBytes;
- this.siblingsBytes = siblingsBytes;
this.siblings = siblings;
- this.sesAttrsBytes = sesAttrsBytes;
this.sesAttrs = sesAttrs;
- this.jobAttrsBytes = jobAttrsBytes;
this.jobAttrs = jobAttrs;
this.clsLdrId = clsLdrId;
- this.depMode = depMode;
+ depModeMsg = new DeploymentModeMessage(depMode);
this.dynamicSiblings = dynamicSiblings;
this.ldrParticipants = ldrParticipants;
this.forceLocDep = forceLocDep;
this.sesFullSup = sesFullSup;
this.internal = internal;
- this.idsOfCaches = cacheIds;
+ this.cacheIds = cacheIds;
this.part = part;
this.topVer = topVer;
this.execName = execName;
@@ -271,45 +271,87 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
/**
* @return Task session ID.
*/
- public IgniteUuid getSessionId() {
+ public IgniteUuid sessionId() {
return sesId;
}
+ /**
+ * @param sesId New task session ID.
+ */
+ public void sessionId(IgniteUuid sesId) {
+ this.sesId = sesId;
+ }
+
/**
* @return Job session ID.
*/
- public IgniteUuid getJobId() {
+ public IgniteUuid jobId() {
return jobId;
}
/**
- * @return Task version.
+ * @param jobId New job session ID.
*/
- public String getTaskClassName() {
+ public void jobId(IgniteUuid jobId) {
+ this.jobId = jobId;
+ }
+
+ /**
+ * @return Task class name.
+ */
+ public String taskClassName() {
return taskClsName;
}
+ /**
+ * @param taskClsName New task class name.
+ */
+ public void taskClassName(String taskClsName) {
+ this.taskClsName = taskClsName;
+ }
+
/**
* @return Task name.
*/
- public String getTaskName() {
+ public String taskName() {
return taskName;
}
+ /**
+ * @param taskName New task name.
+ */
+ public void taskName(String taskName) {
+ this.taskName = taskName;
+ }
+
/**
* @return Task version.
*/
- public String getUserVersion() {
+ public String userVersion() {
return userVer;
}
+ /**
+ * @param userVer New task version.
+ */
+ public void userVersion(String userVer) {
+ this.userVer = userVer;
+ }
+
/**
* @return Serialized job bytes.
*/
- public byte[] getJobBytes() {
+ public byte[] jobBytes() {
return jobBytes;
}
+ /**
+ * @param jobBytes New serialized job bytes.
+ */
+ public void jobBytes(byte[] jobBytes) {
+ this.jobBytes = jobBytes;
+ }
+
/**
* @return Grid job.
*/
@@ -320,17 +362,31 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
/**
* @return Task start time.
*/
- public long getStartTaskTime() {
+ public long startTaskTime() {
return startTaskTime;
}
+ /**
+ * @param startTaskTime New task start time.
+ */
+ public void startTaskTime(long startTaskTime) {
+ this.startTaskTime = startTaskTime;
+ }
+
/**
* @return Timeout.
*/
- public long getTimeout() {
+ public long timeout() {
return timeout;
}
+ /**
+ * @param timeout New timeout.
+ */
+ public void timeout(long timeout) {
+ this.timeout = timeout;
+ }
+
/**
* Gets this instance creation time.
*
@@ -343,10 +399,17 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
/**
* @return Serialized collection of split siblings.
*/
- public byte[] getSiblingsBytes() {
+ public byte[] siblingsBytes() {
return siblingsBytes;
}
+ /**
+ * @param siblingsBytes New serialized collection of split siblings.
+ */
+ public void siblingsBytes(byte[] siblingsBytes) {
+ this.siblingsBytes = siblingsBytes;
+ }
+
/**
* @return Job siblings.
*/
@@ -355,12 +418,19 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
}
/**
- * @return Session attributes.
+ * @return Serialized form of session attributes.
*/
- public byte[] getSessionAttributesBytes() {
+ public byte[] sessionAttributesBytes() {
return sesAttrsBytes;
}
+ /**
+ * @param sesAttrsBytes New serialized form of session attributes.
+ */
+ public void sessionAttributesBytes(byte[] sesAttrsBytes) {
+ this.sesAttrsBytes = sesAttrsBytes;
+ }
+
/**
* @return Session attributes.
*/
@@ -369,12 +439,19 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
}
/**
- * @return Job attributes.
+ * @return Serialized form of job attributes.
*/
- public byte[] getJobAttributesBytes() {
+ public byte[] jobAttributesBytes() {
return jobAttrsBytes;
}
+ /**
+ * @param jobAttrsBytes New serialized form of job attributes.
+ */
+ public void jobAttributesBytes(byte[] jobAttrsBytes) {
+ this.jobAttrsBytes = jobAttrsBytes;
+ }
+
/**
* @return Job attributes.
*/
@@ -385,22 +462,50 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
/**
* @return Checkpoint SPI name.
*/
- public String getCheckpointSpi() {
+ public String checkpointSpi() {
return cpSpi;
}
+ /**
+ * @param cpSpi New checkpoint SPI name.
+ */
+ public void checkpointSpi(String cpSpi) {
+ this.cpSpi = cpSpi;
+ }
+
/**
* @return Task local class loader id.
*/
- public IgniteUuid getClassLoaderId() {
+ public IgniteUuid classLoaderId() {
return clsLdrId;
}
+ /**
+ * @param clsLdrId New task local class loader id.
+ */
+ public void classLoaderId(IgniteUuid clsLdrId) {
+ this.clsLdrId = clsLdrId;
+ }
+
/**
* @return Deployment mode.
*/
public DeploymentMode getDeploymentMode() {
- return depMode;
+ return depModeMsg.value();
+ }
+
+ /**
+ * @return Deployment mode messsage.
+ */
+ public DeploymentModeMessage deploymentModeMessage() {
+ return depModeMsg;
+ }
+
+ /**
+ * @param depModeMsg New deployment mode messsage.
+ */
+ public void deploymentModeMessage(DeploymentModeMessage depModeMsg) {
+ this.depModeMsg = depModeMsg;
}
/**
@@ -408,24 +513,45 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
*
* @return True if siblings list is dynamic.
*/
- public boolean isDynamicSiblings() {
+ public boolean dynamicSiblings() {
return dynamicSiblings;
}
+ /**
+ * @param dynamicSiblings New dynamic siblings flag.
+ */
+ public void dynamicSiblings(boolean dynamicSiblings) {
+ this.dynamicSiblings = dynamicSiblings;
+ }
+
/**
* @return Node class loader participant map.
*/
- public Map<UUID, IgniteUuid> getLoaderParticipants() {
+ public Map<UUID, IgniteUuid> loaderParticipants() {
return ldrParticipants;
}
+ /**
+ * @param ldrParticipants New node class loader participant map.
+ */
+ public void loaderParticipants(Map<UUID, IgniteUuid> ldrParticipants) {
+ this.ldrParticipants = ldrParticipants;
+ }
+
/**
* @return Returns {@code true} if deployment should always be used.
*/
- public boolean isForceLocalDeployment() {
+ public boolean forceLocalDeployment() {
return forceLocDep;
}
+ /**
+ * @param forceLocDep New local deployment forcing flag.
+ */
+ public void forceLocalDeployment(boolean forceLocDep) {
+ this.forceLocDep = forceLocDep;
+ }
+
/**
* @return Topology.
*/
@@ -433,6 +559,13 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
return top;
}
+ /**
+ * @param top New topology.
+ */
+ public void topology(@Nullable Collection<UUID> top) {
+ this.top = top;
+ }
+
/**
* @return Topology predicate.
*/
@@ -443,438 +576,148 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
/**
* @return Marshalled topology predicate.
*/
- public byte[] getTopologyPredicateBytes() {
+ public byte[] topologyPredicateBytes() {
return topPredBytes;
}
+ /**
+ * @param topPredBytes New marshalled topology predicate.
+ */
+ public void topologyPredicateBytes(byte[] topPredBytes) {
+ this.topPredBytes = topPredBytes;
+ }
+
/**
* @return {@code True} if session attributes are enabled.
*/
- public boolean isSessionFullSupport() {
+ public boolean sessionFullSupport() {
return sesFullSup;
}
+ /**
+ * @param sesFullSup New flag, indicating that session attributes are
enabled.
+ */
+ public void sessionFullSupport(boolean sesFullSup) {
+ this.sesFullSup = sesFullSup;
+ }
+
/**
* @return {@code True} if internal job.
*/
- public boolean isInternal() {
+ public boolean internal() {
return internal;
}
+ /**
+ * @param internal New internal job flag.
+ */
+ public void internal(boolean internal) {
+ this.internal = internal;
+ }
+
/**
* @return Caches' identifiers to reserve specified partition for job
execution.
*/
- public int[] getCacheIds() {
- return idsOfCaches;
+ public int[] cacheIds() {
+ return cacheIds;
}
/**
- * @return Partitions to lock for job execution.
+ * @param cacheIds New cache identifiers.
*/
- public int getPartition() {
+ public void cacheIds(int[] cacheIds) {
+ this.cacheIds = cacheIds;
+ }
+
+ /**
+ * @return Partition to lock for job execution.
+ */
+ public int partition() {
return part;
}
+ /**
+ * @param part New partition.
+ */
+ public void partition(int part) {
+ this.part = part;
+ }
+
/** {@inheritDoc} */
@Override public String executorName() {
return execName;
}
+ /**
+ * @param execName New executor name.
+ */
+ public void executorName(String execName) {
+ this.execName = execName;
+ }
+
/**
* @return Affinity version which was used to map job
*/
- public AffinityTopologyVersion getTopVer() {
+ public AffinityTopologyVersion topologyVersion() {
return topVer;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeIgniteUuid(clsLdrId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeString(cpSpi))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByte(depMode != null ?
(byte)depMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeBoolean(dynamicSiblings))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeString(execName))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeBoolean(forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeIntArray(idsOfCaches))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeBoolean(internal))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray(jobAttrsBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeByteArray(jobBytes))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeIgniteUuid(jobId))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMap(ldrParticipants,
MessageCollectionItemType.UUID, IGNITE_UUID))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeInt(part))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeByteArray(sesAttrsBytes))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeBoolean(sesFullSup))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeIgniteUuid(sesId))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeByteArray(siblingsBytes))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeLong(startTaskTime))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeString(taskClsName))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeString(taskName))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeLong(timeout))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeCollection(top,
MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeByteArray(topPredBytes))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeString(userVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ /**
+ * @param topVer New topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
}
/** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- clsLdrId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- cpSpi = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- byte depModeOrd;
-
- depModeOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
- reader.incrementState();
-
- case 3:
- dynamicSiblings = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- execName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- forceLocDep = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- idsOfCaches = reader.readIntArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- internal = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- jobAttrsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- jobBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- jobId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- ldrParticipants =
reader.readMap(MessageCollectionItemType.UUID, IGNITE_UUID, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- part = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- sesAttrsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- sesFullSup = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- sesId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- siblingsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- startTaskTime = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- taskClsName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- taskName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- timeout = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- top = reader.readCollection(MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- topPredBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- topVer = reader.readAffinityTopologyVersion();
+ @Override public short directType() {
+ return 1;
+ }
- if (!reader.isLastRead())
- return false;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridJobExecuteRequest.class, this);
+ }
- reader.incrementState();
+ /**
+ * @param marsh Marshaller.
+ */
+ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
+ jobBytes = U.marshal(marsh, job);
+ topPredBytes = U.marshal(marsh, topPred);
+ siblingsBytes = U.marshal(marsh, siblings);
+ sesAttrsBytes = U.marshal(marsh, sesAttrs);
+ jobAttrsBytes = U.marshal(marsh, jobAttrs);
+ }
- case 24:
- userVer = reader.readString();
+ /**
+ * @param marsh Marshaller.
+ * @param ldr Class loader.
+ */
+ public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws
IgniteCheckedException {
+ assert top != null || topPredBytes != null;
+ assert sesAttrsBytes != null || !sesFullSup;
- if (!reader.isLastRead())
- return false;
+ if (!dynamicSiblings && siblings == null)
+ siblings = U.unmarshal(marsh, siblingsBytes, ldr);
- reader.incrementState();
+ if (sesFullSup && sesAttrs == null)
+ sesAttrs = U.unmarshal(marsh, sesAttrsBytes, ldr);
- }
+ if (topPred == null && topPredBytes != null)
+ topPred = U.unmarshal(marsh, topPredBytes, ldr);
- return true;
- }
+ if (jobAttrs == null)
+ jobAttrs = U.unmarshal(marsh, jobAttrsBytes, ldr);
- /** {@inheritDoc} */
- @Override public short directType() {
- return 1;
- }
+ if (job == null)
+ job = U.unmarshal(marsh, jobBytes, ldr);
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridJobExecuteRequest.class, this);
+ // Are not required anymore.
+ siblingsBytes = null;
+ sesAttrsBytes = null;
+ topPredBytes = null;
+ jobAttrsBytes = null;
+ jobBytes = null;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/DeploymentModeMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/DeploymentModeMessage.java
new file mode 100644
index 00000000000..ecd95b3fd9e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/DeploymentModeMessage.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.internal.MessageProcessor;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
+import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
+import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
+import static org.apache.ignite.configuration.DeploymentMode.SHARED;
+
+/**
+ * Message wrapper for {@link DeploymentMode}. See {@link MessageProcessor}
for details.
+ */
+public class DeploymentModeMessage implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 515;
+
+ /** Deployment mode. */
+ private DeploymentMode val;
+
+ /** Code. */
+ @Order(0)
+ private byte code = -1;
+
+ /**
+ * Constructor.
+ */
+ public DeploymentModeMessage() {
+ }
+
+ /**
+ * Constructor.
+ */
+ public DeploymentModeMessage(DeploymentMode depMode) {
+ val = depMode;
+ code = encode(depMode);
+ }
+
+ /**
+ * @return Code.
+ */
+ public byte code() {
+ return code;
+ }
+
+ /**
+ * @param code New code.
+ */
+ public void code(byte code) {
+ this.code = code;
+ val = decode(code);
+ }
+
+ /**
+ * @return Deployment mode.
+ */
+ public DeploymentMode value() {
+ return val;
+ }
+
+ /** @param depMode Deployment mode to encode. */
+ private static byte encode(@Nullable DeploymentMode depMode) {
+ if (depMode == null)
+ return -1;
+
+ switch (depMode) {
+ case PRIVATE: return 0;
+ case ISOLATED: return 1;
+ case SHARED: return 2;
+ case CONTINUOUS: return 3;
+ }
+
+ throw new IllegalArgumentException("Unknown deployment mode: " +
depMode);
+ }
+
+ /** @param code Deployment mode code to decode back to a deployment mode
value. */
+ @Nullable private static DeploymentMode decode(byte code) {
+ switch (code) {
+ case -1: return null;
+ case 0: return PRIVATE;
+ case 1: return ISOLATED;
+ case 2: return SHARED;
+ case 3: return CONTINUOUS;
+ }
+
+ throw new IllegalArgumentException("Unknown deployment mode code: " +
code);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5dfabd85788..4bdc2102d44 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerial
import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
+import org.apache.ignite.internal.codegen.DeploymentModeMessageSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
import org.apache.ignite.internal.codegen.GridCacheEntryInfoSerializer;
@@ -78,6 +79,7 @@ import
org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSeriali
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
+import org.apache.ignite.internal.codegen.GridJobExecuteRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsResponseSerializer;
@@ -320,7 +322,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE,
HandshakeMessage::new, new HandshakeMessageSerializer());
factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE,
HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer());
factory.register((short)0, GridJobCancelRequest::new, new
GridJobCancelRequestSerializer());
- factory.register((short)1, GridJobExecuteRequest::new);
+ factory.register((short)1, GridJobExecuteRequest::new, new
GridJobExecuteRequestSerializer());
factory.register((short)2, GridJobExecuteResponse::new, new
GridJobExecuteResponseSerializer());
factory.register((short)3, GridJobSiblingsRequest::new, new
GridJobSiblingsRequestSerializer());
factory.register((short)4, GridJobSiblingsResponse::new, new
GridJobSiblingsResponseSerializer());
@@ -480,6 +482,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE,
IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new,
new PartitionSizesMapSerializer());
+ factory.register(DeploymentModeMessage.TYPE_CODE,
DeploymentModeMessage::new, new DeploymentModeMessageSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index aa005edfd2e..4ae62e2d343 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -17,13 +17,10 @@
package org.apache.ignite.internal.processors.job;
-import java.io.Serializable;
import java.util.AbstractCollection;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
@@ -89,7 +86,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.collision.CollisionSpi;
@@ -1216,11 +1212,11 @@ public class GridJobProcessor extends
GridProcessorAdapter {
PartitionsReservation partsReservation = null;
- if (req.getCacheIds() != null) {
- assert req.getPartition() >= 0 : req;
- assert !F.isEmpty(req.getCacheIds()) : req;
+ if (req.cacheIds() != null) {
+ assert req.partition() >= 0 : req;
+ assert !F.isEmpty(req.cacheIds()) : req;
- partsReservation = new PartitionsReservation(req.getCacheIds(),
req.getPartition(), req.getTopVer());
+ partsReservation = new PartitionsReservation(req.cacheIds(),
req.partition(), req.topologyVersion());
}
GridJobWorker job = null;
@@ -1233,22 +1229,22 @@ public class GridJobProcessor extends
GridProcessorAdapter {
}
try {
- long endTime = req.getCreateTime() + req.getTimeout();
+ long endTime = req.getCreateTime() + req.timeout();
// Account for overflow.
if (endTime < 0)
endTime = Long.MAX_VALUE;
- GridDeployment tmpDep = req.isForceLocalDeployment() ?
- ctx.deploy().getLocalDeployment(req.getTaskClassName()) :
+ GridDeployment tmpDep = req.forceLocalDeployment() ?
+ ctx.deploy().getLocalDeployment(req.taskClassName()) :
ctx.deploy().getGlobalDeployment(
req.getDeploymentMode(),
- req.getTaskName(),
- req.getTaskClassName(),
- req.getUserVersion(),
+ req.taskName(),
+ req.taskClassName(),
+ req.userVersion(),
node.id(),
- req.getClassLoaderId(),
- req.getLoaderParticipants(),
+ req.classLoaderId(),
+ req.loaderParticipants(),
null);
if (tmpDep == null) {
@@ -1257,7 +1253,7 @@ public class GridJobProcessor extends
GridProcessorAdapter {
// Check local tasks.
for (Map.Entry<String, GridDeployment> d :
ctx.task().getUsedDeploymentMap().entrySet()) {
- if
(d.getValue().classLoaderId().equals(req.getClassLoaderId())) {
+ if
(d.getValue().classLoaderId().equals(req.classLoaderId())) {
assert d.getValue().local();
tmpDep = d.getValue();
@@ -1279,74 +1275,42 @@ public class GridJobProcessor extends
GridProcessorAdapter {
GridJobSessionImpl jobSes;
GridJobContextImpl jobCtx;
- try {
- List<ComputeJobSibling> siblings = null;
-
- if (!req.isDynamicSiblings()) {
- Collection<ComputeJobSibling> siblings0 =
req.getSiblings();
-
- if (siblings0 == null) {
- assert req.getSiblingsBytes() != null;
-
- siblings0 = U.unmarshal(marsh,
req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
- }
-
- siblings = new ArrayList<>(siblings0);
- }
-
- Map<Object, Object> sesAttrs = null;
+ boolean loc = ctx.localNodeId().equals(node.id()) &&
!ctx.config().isMarshalLocalJobs();
- if (req.isSessionFullSupport()) {
- sesAttrs = req.getSessionAttributes();
-
- if (sesAttrs == null)
- sesAttrs = U.unmarshal(marsh,
req.getSessionAttributesBytes(),
- U.resolveClassLoader(dep.classLoader(),
ctx.config()));
- }
-
- IgnitePredicate<ClusterNode> topPred =
req.getTopologyPredicate();
-
- if (topPred == null && req.getTopologyPredicateBytes()
!= null) {
- topPred = U.unmarshal(marsh,
req.getTopologyPredicateBytes(),
- U.resolveClassLoader(dep.classLoader(),
ctx.config()));
- }
+ try {
+ if (!loc)
+ req.finishUnmarshal(marsh,
U.resolveClassLoader(dep.classLoader(), ctx.config()));
// Note that we unmarshal session/job attributes here
with proper class loader.
GridTaskSessionImpl taskSes =
ctx.session().createTaskSession(
- req.getSessionId(),
+ req.sessionId(),
node.id(),
- req.getTaskName(),
+ req.taskName(),
dep,
- req.getTaskClassName(),
+ req.taskClassName(),
req.topology(),
- topPred,
- req.getStartTaskTime(),
+ req.getTopologyPredicate(),
+ req.startTaskTime(),
endTime,
- siblings,
- sesAttrs,
- req.isSessionFullSupport(),
- req.isInternal(),
+ req.getSiblings(),
+ req.getSessionAttributes(),
+ req.sessionFullSupport(),
+ req.internal(),
req.executorName(),
ctx.security().securityContext()
);
- taskSes.setCheckpointSpi(req.getCheckpointSpi());
+ taskSes.setCheckpointSpi(req.checkpointSpi());
taskSes.setClassLoader(dep.classLoader());
- jobSes = new GridJobSessionImpl(ctx, taskSes,
req.getJobId());
-
- Map<? extends Serializable, ? extends Serializable>
jobAttrs = req.getJobAttributes();
-
- if (jobAttrs == null)
- jobAttrs = U.unmarshal(marsh,
req.getJobAttributesBytes(),
- U.resolveClassLoader(dep.classLoader(),
ctx.config()));
+ jobSes = new GridJobSessionImpl(ctx, taskSes,
req.jobId());
- jobCtx = new GridJobContextImpl(ctx, req.getJobId(),
jobAttrs);
+ jobCtx = new GridJobContextImpl(ctx, req.jobId(),
req.getJobAttributes());
}
catch (IgniteCheckedException e) {
IgniteException ex = new IgniteException("Failed to
deserialize task attributes " +
- "[taskName=" + req.getTaskName() + ",
taskClsName=" + req.getTaskClassName() +
- ", codeVer=" + req.getUserVersion() + ",
taskClsLdr=" + dep.classLoader() + ']', e);
+ "[taskName=" + req.taskName() + ", taskClsName=" +
req.taskClassName() +
+ ", codeVer=" + req.userVersion() + ", taskClsLdr="
+ dep.classLoader() + ']', e);
U.error(log, ex.getMessage(), e);
@@ -1361,14 +1325,13 @@ public class GridJobProcessor extends
GridProcessorAdapter {
req.getCreateTime(),
jobSes,
jobCtx,
- req.getJobBytes(),
req.getJob(),
node,
- req.isInternal(),
+ req.internal(),
evtLsnr,
holdLsnr,
partsReservation,
- req.getTopVer(),
+ req.topologyVersion(),
req.executorName(),
this::computeJobWorkerInterruptTimeout
);
@@ -1378,7 +1341,7 @@ public class GridJobProcessor extends
GridProcessorAdapter {
// If exception occurs on job initialization, deployment
is released in job listener.
releaseDep = false;
- if (job.initialize(dep,
dep.deployedClass(req.getTaskClassName()).get1())) {
+ if (job.initialize(dep,
dep.deployedClass(req.taskClassName()).get1())) {
// Internal jobs will always be executed synchronously.
if (job.isInternal()) {
// This is an internal job and can be executed
inside busy lock
@@ -1424,7 +1387,7 @@ public class GridJobProcessor extends
GridProcessorAdapter {
U.error(log, "Received computation request
with duplicate job ID (could be " +
"network malfunction, source node may hang
if task timeout was not set) " +
"[srcNode=" + node.id() +
- ", jobId=" + req.getJobId() + ", sesId=" +
req.getSessionId() +
+ ", jobId=" + req.jobId() + ", sesId=" +
req.sessionId() +
", locNodeId=" + ctx.localNodeId() + ']');
// No sync execution.
@@ -1438,9 +1401,9 @@ public class GridJobProcessor extends
GridProcessorAdapter {
else {
// Deployment is null.
IgniteException ex = new IgniteDeploymentException("Task
was not deployed or was redeployed since " +
- "task execution [taskName=" + req.getTaskName() + ",
taskClsName=" + req.getTaskClassName() +
- ", codeVer=" + req.getUserVersion() + ", clsLdrId=" +
req.getClassLoaderId() +
- ", seqNum=" + req.getClassLoaderId().localId() + ",
depMode=" + req.getDeploymentMode() +
+ "task execution [taskName=" + req.taskName() + ",
taskClsName=" + req.taskClassName() +
+ ", codeVer=" + req.userVersion() + ", clsLdrId=" +
req.classLoaderId() +
+ ", seqNum=" + req.classLoaderId().localId() + ",
depMode=" + req.getDeploymentMode() +
", dep=" + dep + ']');
U.error(log, ex.getMessage(), ex);
@@ -1645,17 +1608,17 @@ public class GridJobProcessor extends
GridProcessorAdapter {
if (sndNode == null) {
U.warn(log, "Failed to reply to sender node because it left grid
[nodeId=" + node.id() +
- ", jobId=" + req.getJobId() + ']');
+ ", jobId=" + req.jobId() + ']');
if (ctx.event().isRecordable(EVT_JOB_FAILED)) {
JobEvent evt = new JobEvent();
- evt.jobId(req.getJobId());
- evt.message("Job reply failed (original task node left grid):
" + req.getJobId());
+ evt.jobId(req.jobId());
+ evt.message("Job reply failed (original task node left grid):
" + req.jobId());
evt.node(ctx.discovery().localNode());
- evt.taskName(req.getTaskName());
- evt.taskClassName(req.getTaskClassName());
- evt.taskSessionId(req.getSessionId());
+ evt.taskName(req.taskName());
+ evt.taskClassName(req.taskClassName());
+ evt.taskSessionId(req.sessionId());
evt.type(EVT_JOB_FAILED);
evt.taskNode(node);
evt.taskSubjectId(securitySubjectId(ctx));
@@ -1672,8 +1635,8 @@ public class GridJobProcessor extends
GridProcessorAdapter {
GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
locNodeId,
- req.getSessionId(),
- req.getJobId(),
+ req.sessionId(),
+ req.jobId(),
ex,
null,
null,
@@ -1683,12 +1646,12 @@ public class GridJobProcessor extends
GridProcessorAdapter {
if (!loc)
jobRes.marshallUserData(marsh, log);
- if (req.isSessionFullSupport()) {
+ if (req.sessionFullSupport()) {
// Send response to designated job topic.
// Always go through communication to preserve order,
// if attributes are enabled.
// Job response topic.
- Object topic = TOPIC_TASK.topic(req.getJobId(), locNodeId);
+ Object topic = TOPIC_TASK.topic(req.jobId(), locNodeId);
long timeout = endTime - U.currentTimeMillis();
@@ -1702,7 +1665,7 @@ public class GridJobProcessor extends
GridProcessorAdapter {
sndNode,
topic,
jobRes,
- req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL,
+ req.internal() ? MANAGEMENT_POOL : SYSTEM_POOL,
timeout,
false);
}
@@ -1710,30 +1673,30 @@ public class GridJobProcessor extends
GridProcessorAdapter {
ctx.task().processJobExecuteResponse(ctx.localNodeId(),
jobRes);
else
// Send response to common topic as unordered message.
- ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes,
req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
+ ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes,
req.internal() ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// The only option here is to log, as we must assume that
resending will fail too.
if ((e instanceof ClusterTopologyCheckedException) ||
isDeadNode(node.id()))
// Avoid stack trace for left nodes.
U.error(log, "Failed to reply to sender node because it left
grid [nodeId=" + node.id() +
- ", jobId=" + req.getJobId() + ']');
+ ", jobId=" + req.jobId() + ']');
else {
assert sndNode != null;
U.error(log, "Error sending reply for job [nodeId=" +
sndNode.id() + ", jobId=" +
- req.getJobId() + ']', e);
+ req.jobId() + ']', e);
}
if (ctx.event().isRecordable(EVT_JOB_FAILED)) {
JobEvent evt = new JobEvent();
- evt.jobId(req.getJobId());
- evt.message("Failed to send reply for job: " + req.getJobId());
+ evt.jobId(req.jobId());
+ evt.message("Failed to send reply for job: " + req.jobId());
evt.node(ctx.discovery().localNode());
- evt.taskName(req.getTaskName());
- evt.taskClassName(req.getTaskClassName());
- evt.taskSessionId(req.getSessionId());
+ evt.taskName(req.taskName());
+ evt.taskClassName(req.taskClassName());
+ evt.taskSessionId(req.sessionId());
evt.type(EVT_JOB_FAILED);
evt.taskNode(node);
evt.taskSubjectId(securitySubjectId(ctx));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 4fd7eff87e9..3f481dd8eff 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -66,7 +66,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
@@ -115,9 +114,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
/** */
private final Object taskTopic;
- /** */
- private byte[] jobBytes;
-
/** Task originating node. */
private final ClusterNode taskNode;
@@ -197,7 +193,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
* @param createTime Create time.
* @param ses Grid task session.
* @param jobCtx Job context.
- * @param jobBytes Grid job bytes.
* @param job Job.
* @param taskNode Grid task node.
* @param internal Whether or not task was marked with {@link GridInternal}
@@ -215,7 +210,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
long createTime,
GridJobSessionImpl ses,
GridJobContextImpl jobCtx,
- byte[] jobBytes,
ComputeJob job,
ClusterNode taskNode,
boolean internal,
@@ -241,7 +235,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
this.dep = dep;
this.ses = ses;
this.jobCtx = jobCtx;
- this.jobBytes = jobBytes;
this.taskNode = taskNode;
this.internal = internal;
this.holdLsnr = holdLsnr;
@@ -249,9 +242,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
this.reqTopVer = reqTopVer;
this.execName = execName;
this.jobInterruptTimeoutSupplier = jobInterruptTimeoutSupplier;
-
- if (job != null)
- this.job = job;
+ this.job = job;
log = U.logger(ctx, logRef, this);
@@ -480,20 +471,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
IgniteException ex = null;
try {
- if (job == null) {
- MarshallerUtils.jobSenderVersion(taskNode.version());
-
- try {
- job = U.unmarshal(marsh, jobBytes,
U.resolveClassLoader(dep.classLoader(), ctx.config()));
- }
- finally {
- MarshallerUtils.jobSenderVersion(null);
- }
-
- // No need to hold reference any more.
- jobBytes = null;
- }
-
// Inject resources.
ctx.resource().inject(dep, taskCls, job, ses, jobCtx);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 62e9944210e..0d0de757abd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -88,7 +88,6 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.resources.TaskContinuousMapperResource;
import org.jetbrains.annotations.Nullable;
@@ -1388,48 +1387,38 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
boolean forceLocDep = internal || !ctx.deploy().enabled();
- try {
- MarshallerUtils.jobReceiverVersion(node.version());
-
- req = new GridJobExecuteRequest(
- ses.getId(),
- res.getJobContext().getJobId(),
- ses.getTaskName(),
- ses.getUserVersion(),
- ses.getTaskClassName(),
- loc ? null : U.marshal(marsh, res.getJob()),
- loc ? res.getJob() : null,
- ses.getStartTime(),
- timeout,
- ses.getTopology(),
- loc ? ses.getTopologyPredicate() : null,
- loc ? null : U.marshal(marsh,
ses.getTopologyPredicate()),
- loc ? null : U.marshal(marsh,
ses.getJobSiblings()),
- loc ? ses.getJobSiblings() : null,
- loc ? null : U.marshal(marsh, sesAttrs),
- loc ? sesAttrs : null,
- loc ? null : U.marshal(marsh, jobAttrs),
- loc ? jobAttrs : null,
- ses.getCheckpointSpi(),
- dep.classLoaderId(),
- dep.deployMode(),
- continuous,
- dep.participants(),
- forceLocDep,
- ses.isFullSupport(),
- internal,
- affCacheIds,
- affPartId,
- mapTopVer,
- ses.executorName());
- }
- finally {
- MarshallerUtils.jobReceiverVersion(null);
- }
+ req = new GridJobExecuteRequest(
+ ses.getId(),
+ res.getJobContext().getJobId(),
+ ses.getTaskName(),
+ ses.getUserVersion(),
+ ses.getTaskClassName(),
+ res.getJob(),
+ ses.getStartTime(),
+ timeout,
+ ses.getTopology(),
+ ses.getTopologyPredicate(),
+ ses.getJobSiblings(),
+ sesAttrs,
+ jobAttrs,
+ ses.getCheckpointSpi(),
+ dep.classLoaderId(),
+ dep.deployMode(),
+ continuous,
+ dep.participants(),
+ forceLocDep,
+ ses.isFullSupport(),
+ internal,
+ affCacheIds,
+ affPartId,
+ mapTopVer,
+ ses.executorName());
if (loc)
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
else {
+ req.prepareMarshal(marsh);
+
byte plc;
if (internal)
diff --git
a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
index fb23c8bcbbd..4d474aa826a 100644
---
a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
@@ -34,7 +34,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.ClassSet;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
@@ -57,12 +56,6 @@ public class MarshallerUtils {
/** Default white list class names file. */
public static final String DEFAULT_WHITELIST_CLS_NAMES_FILE =
"META-INF/classnames-default-whitelist.properties";
- /** Job sender node version. */
- private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER =
new ThreadLocal<>();
-
- /** Job sender node version. */
- private static final ThreadLocal<IgniteProductVersion> JOB_RCV_NODE_VER =
new ThreadLocal<>();
-
/** */
private static final Object MUX = new Object();
@@ -73,42 +66,6 @@ public class MarshallerUtils {
// No-op.
}
- /**
- * Sets thread local job sender node version.
- *
- * @param ver Thread local job sender node version.
- */
- public static void jobSenderVersion(IgniteProductVersion ver) {
- JOB_SND_NODE_VER.set(ver);
- }
-
- /**
- * Returns thread local job sender node version.
- *
- * @return Thread local job sender node version.
- */
- public static IgniteProductVersion jobSenderVersion() {
- return JOB_SND_NODE_VER.get();
- }
-
- /**
- * Sets thread local job receiver node version.
- *
- * @param ver Thread local job receiver node version.
- */
- public static void jobReceiverVersion(IgniteProductVersion ver) {
- JOB_RCV_NODE_VER.set(ver);
- }
-
- /**
- * Returns thread local job receiver node version.
- *
- * @return Thread local job receiver node version.
- */
- public static IgniteProductVersion jobReceiverVersion() {
- return JOB_RCV_NODE_VER.get();
- }
-
/**
* Returns class name filter for marshaller.
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
index ebfdb0e6f24..5c87c1819db 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
@@ -513,7 +513,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
G.allGrids().forEach(g ->
((IgniteEx)g).context().io().addMessageListener(GridTopic.TOPIC_JOB, new
GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc)
{
if (msg instanceof GridJobExecuteRequest
- &&
((GridJobExecuteRequest)msg).getTaskClassName().contains(GridServiceProxy.class.getName()))
+ &&
((GridJobExecuteRequest)msg).taskClassName().contains(GridServiceProxy.class.getName()))
redirectCnt.incrementAndGet();
}
}));
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/DeploymentModeMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/DeploymentModeMessageTest.java
new file mode 100644
index 00000000000..5821a8d73e9
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/DeploymentModeMessageTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
+import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
+import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
+import static org.apache.ignite.configuration.DeploymentMode.SHARED;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/** */
+public class DeploymentModeMessageTest {
+ /** */
+ @Test
+ public void testDeploymentModeCode() {
+ assertEquals(-1, new DeploymentModeMessage(null).code());
+ assertEquals(0, new DeploymentModeMessage(PRIVATE).code());
+ assertEquals(1, new DeploymentModeMessage(ISOLATED).code());
+ assertEquals(2, new DeploymentModeMessage(SHARED).code());
+ assertEquals(3, new DeploymentModeMessage(CONTINUOUS).code());
+
+ for (DeploymentMode depMode : DeploymentMode.values()) {
+ assertTrue(new DeploymentModeMessage(depMode).code() >= 0);
+ assertTrue(new DeploymentModeMessage(depMode).code() < 4);
+ }
+ }
+
+ /** */
+ @Test
+ public void testDeploymentModeFromCode() {
+ DeploymentModeMessage msg = new DeploymentModeMessage(null);
+
+ msg.code((byte)-1);
+ assertNull(msg.value());
+
+ msg.code((byte)0);
+ assertSame(PRIVATE, msg.value());
+
+ msg.code((byte)1);
+ assertSame(ISOLATED, msg.value());
+
+ msg.code((byte)2);
+ assertSame(SHARED, msg.value());
+
+ msg.code((byte)3);
+ assertSame(CONTINUOUS, msg.value());
+
+ Throwable t = assertThrowsWithCause(() -> msg.code((byte)4),
IllegalArgumentException.class);
+ assertEquals("Unknown deployment mode code: 4", t.getMessage());
+ }
+
+ /** */
+ @Test
+ public void testConversionConsistency() {
+ for (DeploymentMode depMode : F.concat(DeploymentMode.values(),
(DeploymentMode)null)) {
+ DeploymentModeMessage msg = new DeploymentModeMessage(depMode);
+
+ assertEquals(depMode, msg.value());
+
+ DeploymentModeMessage newMsg = new DeploymentModeMessage();
+ newMsg.code(msg.code());
+
+ assertEquals(msg.value(), newMsg.value());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLongRunningTransactionDiagnosticsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLongRunningTransactionDiagnosticsTest.java
index b2fa56a1389..50201a8610a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLongRunningTransactionDiagnosticsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLongRunningTransactionDiagnosticsTest.java
@@ -171,7 +171,7 @@ public class GridCacheLongRunningTransactionDiagnosticsTest
extends GridCommonAb
client.context().io().addMessageListener(
GridTopic.TOPIC_JOB,
- (nodeId, msg, plc) ->
taskNameContainer.append(((GridJobExecuteRequest)msg).getTaskName())
+ (nodeId, msg, plc) ->
taskNameContainer.append(((GridJobExecuteRequest)msg).taskName())
);
try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
REPEATABLE_READ, TX_TIMEOUT, 1)) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 83c66539bc2..bb0632c41aa 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.TransactionsMXBeanImplTest;
import org.apache.ignite.internal.codegen.MessageProcessorTest;
import
org.apache.ignite.internal.managers.communication.CacheEntryPredicateAdapterMessageTest;
import
org.apache.ignite.internal.managers.communication.CacheWriteSynchroizationModeMessageTest;
+import
org.apache.ignite.internal.managers.communication.DeploymentModeMessageTest;
import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest;
import
org.apache.ignite.internal.managers.communication.GridCacheOperationModeMessageTest;
import
org.apache.ignite.internal.managers.communication.TransactionIsolationMessageTest;
@@ -153,7 +154,8 @@ import org.junit.runners.Suite;
TransactionIsolationMessageTest.class,
GridCacheOperationModeMessageTest.class,
CacheWriteSynchroizationModeMessageTest.class,
- CacheEntryPredicateAdapterMessageTest.class
+ CacheEntryPredicateAdapterMessageTest.class,
+ DeploymentModeMessageTest.class,
})
public class IgniteBasicTestSuite {
}