Repository: ignite Updated Branches: refs/heads/ignite-zk a11e06a5b -> f50c7ccb5
ignite-1267 Fixed job stealing so that newly joined node is able to steal jobs Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/24f90874 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/24f90874 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/24f90874 Branch: refs/heads/ignite-zk Commit: 24f908748b6d284eac0a2795366e80d9e06e19ff Parents: ff3712c Author: Andrey V. Mashenkov <[email protected]> Authored: Tue Dec 5 10:22:50 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 5 10:22:50 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridJobExecuteRequest.java | 133 +++++++++----- .../ignite/internal/GridTaskSessionImpl.java | 18 ++ .../ignite/internal/IgniteComputeImpl.java | 8 +- .../processors/job/GridJobProcessor.java | 8 + .../session/GridTaskSessionProcessor.java | 6 + .../processors/task/GridTaskProcessor.java | 15 +- .../task/GridTaskThreadContextKey.java | 3 + .../processors/task/GridTaskWorker.java | 2 + .../internal/GridJobStealingSelfTest.java | 7 +- .../GridMultithreadedJobStealingSelfTest.java | 176 ++++++++++++++----- 10 files changed, 283 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index fe2d6d8..4357d1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobSibling; import org.apache.ignite.configuration.DeploymentMode; @@ -31,6 +32,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -137,6 +139,13 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { private Collection<UUID> top; /** */ + @GridDirectTransient + private IgnitePredicate<ClusterNode> topPred; + + /** */ + private byte[] topPredBytes; + + /** */ private int[] idsOfCaches; /** */ @@ -166,6 +175,8 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { * @param startTaskTime Task execution start time. * @param timeout Task execution timeout. * @param top Topology. + * @param topPred Topology predicate. + * @param topPredBytes Marshalled topology predicate. * @param siblingsBytes Serialized collection of split siblings. * @param siblings Collection of split siblings. * @param sesAttrsBytes Map of session attributes. @@ -197,6 +208,8 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { long startTaskTime, long timeout, @Nullable Collection<UUID> top, + @Nullable IgnitePredicate<ClusterNode> topPred, + byte[] topPredBytes, byte[] siblingsBytes, Collection<ComputeJobSibling> siblings, byte[] sesAttrsBytes, @@ -216,7 +229,6 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { int part, @Nullable AffinityTopologyVersion topVer, @Nullable String execName) { - this.top = top; assert sesId != null; assert jobId != null; assert taskName != null; @@ -224,6 +236,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { assert job != null || jobBytes != null; assert sesAttrs != null || sesAttrsBytes != null || !sesFullSup; assert jobAttrs != null || jobAttrsBytes != null; + assert top != null || topPred != null || topPredBytes != null; assert clsLdrId != null; assert userVer != null; assert depMode != null; @@ -238,6 +251,9 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { this.startTaskTime = startTaskTime; this.timeout = timeout; this.top = top; + this.topVer = topVer; + this.topPred = topPred; + this.topPredBytes = topPredBytes; this.siblingsBytes = siblingsBytes; this.siblings = siblings; this.sesAttrsBytes = sesAttrsBytes; @@ -424,6 +440,21 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { @Nullable public Collection<UUID> topology() { return top; } + + /** + * @return Topology predicate. + */ + public IgnitePredicate<ClusterNode> getTopologyPredicate() { + return topPred; + } + + /** + * @return Marshalled topology predicate. + */ + public byte[] getTopologyPredicateBytes() { + return topPredBytes; + } + /** * @return {@code True} if session attributes are enabled. */ @@ -513,127 +544,133 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { writer.incrementState(); case 4: - if (!writer.writeBoolean("forceLocDep", forceLocDep)) + if (!writer.writeString("execName", execName)) return false; writer.incrementState(); case 5: - if (!writer.writeIntArray("idsOfCaches", idsOfCaches)) + if (!writer.writeBoolean("forceLocDep", forceLocDep)) return false; writer.incrementState(); case 6: - if (!writer.writeBoolean("internal", internal)) + if (!writer.writeIntArray("idsOfCaches", idsOfCaches)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes)) + if (!writer.writeBoolean("internal", internal)) return false; writer.incrementState(); case 8: - if (!writer.writeByteArray("jobBytes", jobBytes)) + if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeIgniteUuid("jobId", jobId)) + if (!writer.writeByteArray("jobBytes", jobBytes)) return false; writer.incrementState(); case 10: - if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) + if (!writer.writeIgniteUuid("jobId", jobId)) return false; writer.incrementState(); case 11: - if (!writer.writeInt("part", part)) + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) return false; writer.incrementState(); case 12: - if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 13: - if (!writer.writeBoolean("sesFullSup", sesFullSup)) + if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes)) return false; writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("sesId", sesId)) + if (!writer.writeBoolean("sesFullSup", sesFullSup)) return false; writer.incrementState(); case 15: - if (!writer.writeByteArray("siblingsBytes", siblingsBytes)) + if (!writer.writeIgniteUuid("sesId", sesId)) return false; writer.incrementState(); case 16: - if (!writer.writeLong("startTaskTime", startTaskTime)) + if (!writer.writeByteArray("siblingsBytes", siblingsBytes)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeLong("startTaskTime", startTaskTime)) return false; writer.incrementState(); case 18: - if (!writer.writeString("taskClsName", taskClsName)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeString("taskName", taskName)) + if (!writer.writeString("taskClsName", taskClsName)) return false; writer.incrementState(); case 20: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeString("taskName", taskName)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID)) + if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 23: - if (!writer.writeString("userVer", userVer)) + if (!writer.writeByteArray("topPredBytes", topPredBytes)) return false; writer.incrementState(); case 24: - if (!writer.writeString("executorName", execName)) + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 25: + if (!writer.writeString("userVer", userVer)) return false; writer.incrementState(); @@ -688,7 +725,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 4: - forceLocDep = reader.readBoolean("forceLocDep"); + execName = reader.readString("execName"); if (!reader.isLastRead()) return false; @@ -696,7 +733,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 5: - idsOfCaches = reader.readIntArray("idsOfCaches"); + forceLocDep = reader.readBoolean("forceLocDep"); if (!reader.isLastRead()) return false; @@ -704,7 +741,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 6: - internal = reader.readBoolean("internal"); + idsOfCaches = reader.readIntArray("idsOfCaches"); if (!reader.isLastRead()) return false; @@ -712,7 +749,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 7: - jobAttrsBytes = reader.readByteArray("jobAttrsBytes"); + internal = reader.readBoolean("internal"); if (!reader.isLastRead()) return false; @@ -720,7 +757,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 8: - jobBytes = reader.readByteArray("jobBytes"); + jobAttrsBytes = reader.readByteArray("jobAttrsBytes"); if (!reader.isLastRead()) return false; @@ -728,7 +765,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 9: - jobId = reader.readIgniteUuid("jobId"); + jobBytes = reader.readByteArray("jobBytes"); if (!reader.isLastRead()) return false; @@ -736,7 +773,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 10: - ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); + jobId = reader.readIgniteUuid("jobId"); if (!reader.isLastRead()) return false; @@ -744,7 +781,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 11: - part = reader.readInt("part"); + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); if (!reader.isLastRead()) return false; @@ -752,7 +789,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 12: - sesAttrsBytes = reader.readByteArray("sesAttrsBytes"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -760,7 +797,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 13: - sesFullSup = reader.readBoolean("sesFullSup"); + sesAttrsBytes = reader.readByteArray("sesAttrsBytes"); if (!reader.isLastRead()) return false; @@ -768,7 +805,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 14: - sesId = reader.readIgniteUuid("sesId"); + sesFullSup = reader.readBoolean("sesFullSup"); if (!reader.isLastRead()) return false; @@ -776,7 +813,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 15: - siblingsBytes = reader.readByteArray("siblingsBytes"); + sesId = reader.readIgniteUuid("sesId"); if (!reader.isLastRead()) return false; @@ -784,7 +821,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 16: - startTaskTime = reader.readLong("startTaskTime"); + siblingsBytes = reader.readByteArray("siblingsBytes"); if (!reader.isLastRead()) return false; @@ -792,7 +829,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 17: - subjId = reader.readUuid("subjId"); + startTaskTime = reader.readLong("startTaskTime"); if (!reader.isLastRead()) return false; @@ -800,7 +837,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 18: - taskClsName = reader.readString("taskClsName"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -808,7 +845,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 19: - taskName = reader.readString("taskName"); + taskClsName = reader.readString("taskClsName"); if (!reader.isLastRead()) return false; @@ -816,7 +853,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 20: - timeout = reader.readLong("timeout"); + taskName = reader.readString("taskName"); if (!reader.isLastRead()) return false; @@ -824,7 +861,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 21: - top = reader.readCollection("top", MessageCollectionItemType.UUID); + timeout = reader.readLong("timeout"); if (!reader.isLastRead()) return false; @@ -832,7 +869,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 22: - topVer = reader.readMessage("topVer"); + top = reader.readCollection("top", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -840,7 +877,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 23: - userVer = reader.readString("userVer"); + topPredBytes = reader.readByteArray("topPredBytes"); if (!reader.isLastRead()) return false; @@ -848,7 +885,15 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { reader.incrementState(); case 24: - execName = reader.readString("executorName"); + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 25: + userVer = reader.readString("userVer"); if (!reader.isLastRead()) return false; @@ -867,7 +912,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 26; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java index 458ad36..ce6e831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobSibling; import org.apache.ignite.compute.ComputeTaskSessionAttributeListener; import org.apache.ignite.compute.ComputeTaskSessionScope; @@ -38,6 +39,7 @@ import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -109,6 +111,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { private final Collection<UUID> top; /** */ + private final IgnitePredicate<ClusterNode> topPred; + + /** */ private final UUID subjId; /** */ @@ -124,6 +129,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { * @param taskClsName Task class name. * @param sesId Task session ID. * @param top Topology. + * @param topPred Topology predicate. * @param startTime Task execution start time. * @param endTime Task execution end time. * @param siblings Collection of siblings. @@ -141,6 +147,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { String taskClsName, IgniteUuid sesId, @Nullable Collection<UUID> top, + @Nullable IgnitePredicate<ClusterNode> topPred, long startTime, long endTime, Collection<ComputeJobSibling> siblings, @@ -159,6 +166,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { this.taskName = taskName; this.dep = dep; this.top = top; + this.topPred = topPred; // Note that class name might be null here if task was not explicitly // deployed. @@ -772,8 +780,18 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { return ctx.checkpoint().removeCheckpoint(ses, key); } + /** + * @return Topology predicate. + */ + @Nullable public IgnitePredicate<ClusterNode> getTopologyPredicate() { + return topPred; + } + /** {@inheritDoc} */ @Override public Collection<UUID> getTopology() { + if (topPred != null) + return F.viewReadOnly(ctx.discovery().allNodes(), F.node2id(), topPred); + return top != null ? top : F.nodeIds(ctx.discovery().allNodes()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index 06619f9..8d473e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -52,7 +52,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.GridClosureCallMode.BALANCE; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; -import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT; @@ -481,7 +481,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); + ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, prj.predicate()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); return ctx.task().execute(taskName, arg, execName); @@ -521,7 +521,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); + ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, prj.predicate()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); return ctx.task().execute(taskCls, arg, execName); @@ -560,7 +560,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); + ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, prj.predicate()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); return ctx.task().execute(task, arg, execName); http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 9052543..a5add4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1043,6 +1043,13 @@ public class GridJobProcessor extends GridProcessorAdapter { U.resolveClassLoader(dep.classLoader(), ctx.config())); } + IgnitePredicate<ClusterNode> topologyPred = req.getTopologyPredicate(); + + if (topologyPred == null && req.getTopologyPredicateBytes() != null) { + topologyPred = U.unmarshal(marsh, req.getTopologyPredicateBytes(), + U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + // Note that we unmarshal session/job attributes here with proper class loader. GridTaskSessionImpl taskSes = ctx.session().createTaskSession( req.getSessionId(), @@ -1051,6 +1058,7 @@ public class GridJobProcessor extends GridProcessorAdapter { dep, req.getTaskClassName(), req.topology(), + topologyPred, req.getStartTaskTime(), endTime, siblings, http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java index 91ccf4a..765743c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java @@ -22,12 +22,14 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobSibling; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -69,6 +71,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { * @param dep Deployment. * @param taskClsName Task class name. * @param top Topology. + * @param topPred Topology predicate. * @param startTime Execution start time. * @param endTime Execution end time. * @param siblings Collection of siblings. @@ -86,6 +89,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { @Nullable GridDeployment dep, String taskClsName, @Nullable Collection<UUID> top, + @Nullable IgnitePredicate<ClusterNode> topPred, long startTime, long endTime, Collection<ComputeJobSibling> siblings, @@ -102,6 +106,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { taskClsName, sesId, top, + topPred, startTime, endTime, siblings, @@ -126,6 +131,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { taskClsName, sesId, top, + topPred, startTime, endTime, siblings, http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 4606b7c..25a38ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.security.SecurityPermission; @@ -85,6 +86,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_TASK_CANCEL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT; @@ -658,12 +660,18 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha if (log.isDebugEnabled()) log.debug("Task deployment: " + dep); - boolean fullSup = dep != null && taskCls!= null && + boolean fullSup = dep != null && taskCls != null && dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null; - Collection<? extends ClusterNode> nodes = (Collection<? extends ClusterNode>)map.get(TC_SUBGRID); + Collection<UUID> top = null; - Collection<UUID> top = nodes != null ? F.nodeIds(nodes) : null; + final IgnitePredicate<ClusterNode> topPred = (IgnitePredicate<ClusterNode>)map.get(TC_SUBGRID_PREDICATE); + + if (topPred == null) { + final Collection<ClusterNode> nodes = (Collection<ClusterNode>)map.get(TC_SUBGRID); + + top = nodes != null ? F.nodeIds(nodes) : null; + } UUID subjId = getThreadContext(TC_SUBJ_ID); @@ -685,6 +693,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha dep, taskCls == null ? null : taskCls.getName(), top, + topPred, startTime, endTime, Collections.<ComputeJobSibling>emptyList(), http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java index f0e56c7..92bcd41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java @@ -30,6 +30,9 @@ public enum GridTaskThreadContextKey { /** Projection for the task. */ TC_SUBGRID, + /** Projection predicate for the task. */ + TC_SUBGRID_PREDICATE, + /** Timeout in milliseconds associated with the task. */ TC_TIMEOUT, http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index b94a427..25f3029 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1381,6 +1381,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { ses.getStartTime(), timeout, ses.getTopology(), + loc ? ses.getTopologyPredicate() : null, + loc ? null : U.marshal(marsh, ses.getTopologyPredicate()), loc ? null : U.marshal(marsh, ses.getJobSiblings()), loc ? ses.getJobSiblings() : null, loc ? null : U.marshal(marsh, sesAttrs), http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java index 56683b6..f3a19aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java @@ -187,10 +187,13 @@ public class GridJobStealingSelfTest extends GridCommonAbstractTest { public void testProjectionPredicateInternalStealing() throws Exception { final Ignite ignite3 = startGrid(3); + final UUID node1 = ignite1.cluster().localNode().id(); + final UUID node3 = ignite3.cluster().localNode().id(); + IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() { @Override public boolean apply(ClusterNode e) { - return ignite1.cluster().localNode().id().equals(e.id()) || - ignite3.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 or grid3 node. + return node1.equals(e.id()) || + node3.equals(e.id()); // Limit projection with only grid1 or grid3 node. } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java index b64a6ad..4a76c68 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java @@ -18,12 +18,17 @@ package org.apache.ignite.internal; import java.io.Serializable; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -40,6 +45,7 @@ import org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.jetbrains.annotations.Nullable; /** @@ -51,6 +57,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest private Ignite ignite; /** */ + private static volatile CountDownLatch jobExecutedLatch; + + /** */ public GridMultithreadedJobStealingSelfTest() { super(false /* don't start grid*/); } @@ -77,6 +86,7 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest final AtomicInteger stolen = new AtomicInteger(0); final AtomicInteger noneStolen = new AtomicInteger(0); + final ConcurrentHashSet nodes = new ConcurrentHashSet(); int threadsNum = 10; @@ -84,28 +94,13 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest /** */ @Override public void run() { try { - JobStealingResult res = ignite.compute().execute(JobStealingTask.class, null); + JobStealingResult res = ignite.compute().execute(new JobStealingTask(2), null); info("Task result: " + res); - switch(res) { - case NONE_STOLEN : { - noneStolen.addAndGet(2); - break; - } - case ONE_STOLEN : { - noneStolen.addAndGet(1); - stolen.addAndGet(1); - break; - } - case BOTH_STOLEN: { - stolen.addAndGet(2); - break; - } - default: { - assert false : "Result is: " + res; - } - } + stolen.addAndGet(res.stolen); + noneStolen.addAndGet(res.nonStolen); + nodes.addAll(res.nodes); } catch (IgniteException e) { log.error("Failed to execute task.", e); @@ -119,20 +114,91 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest info("Metrics [nodeId=" + g.cluster().localNode().id() + ", metrics=" + g.cluster().localNode().metrics() + ']'); - assert fail.get() == null : "Test failed with exception: " + fail.get(); + assertNull("Test failed with exception: ",fail.get()); // Total jobs number is threadsNum * 2 - assert stolen.get() + noneStolen.get() == threadsNum * 2 : "Incorrect processed jobs number"; + assertEquals("Incorrect processed jobs number",threadsNum * 2, stolen.get() + noneStolen.get()); - assert stolen.get() != 0 : "No jobs were stolen."; + assertFalse( "No jobs were stolen.",stolen.get() == 0); + + for (Ignite g : G.allGrids()) + assertTrue("Node get no jobs.", nodes.contains(g.name())); // Under these circumstances we should not have more than 2 jobs // difference. //(but muted to 4 due to very rare fails and low priority of fix) - assert Math.abs(stolen.get() - noneStolen.get()) <= 4 : "Stats [stolen=" + stolen + - ", noneStolen=" + noneStolen + ']'; + assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']', + Math.abs(stolen.get() - noneStolen.get()) <= 4); + } + + /** + * Test newly joined node can steal jobs. + * + * @throws Exception If test failed. + */ + public void testJoinedNodeCanStealJobs() throws Exception { + final AtomicReference<Exception> fail = new AtomicReference<>(null); + + final AtomicInteger stolen = new AtomicInteger(0); + final AtomicInteger noneStolen = new AtomicInteger(0); + final ConcurrentHashSet nodes = new ConcurrentHashSet(); + + int threadsNum = 10; + + final int jobsPerTask = 4; + + jobExecutedLatch = new CountDownLatch(threadsNum); + + final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Runnable() { + /** */ + @Override public void run() { + try { + final IgniteCompute compute = ignite.compute().withAsync(); + + compute.execute(new JobStealingTask(jobsPerTask), null); + + JobStealingResult res = (JobStealingResult)compute.future().get(); + + info("Task result: " + res); + + stolen.addAndGet(res.stolen); + noneStolen.addAndGet(res.nonStolen); + nodes.addAll(res.nodes); + } + catch (IgniteException e) { + log.error("Failed to execute task.", e); + + fail.getAndSet(e); + } + } + }, threadsNum, "JobStealingThread"); + + //Wait for first job begin execution. + jobExecutedLatch.await(); + + startGrid(2); + + for (Ignite g : G.allGrids()) + info("Metrics [nodeId=" + g.cluster().localNode().id() + + ", metrics=" + g.cluster().localNode().metrics() + ']'); + + future.get(); + + assertNull("Test failed with exception: ",fail.get()); + + // Total jobs number is threadsNum * 3 + assertEquals("Incorrect processed jobs number",threadsNum * jobsPerTask, stolen.get() + noneStolen.get()); + + assertFalse( "No jobs were stolen.",stolen.get() == 0); + + for (Ignite g : G.allGrids()) + assertTrue("Node get no jobs.", nodes.contains(g.name())); + + assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']', + Math.abs(stolen.get() - 2 * noneStolen.get()) <= 6); } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -166,38 +232,50 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest @LoggerResource private IgniteLogger log; + /** */ + private int jobsToRun; + + /** */ + public JobStealingTask(int jobsToRun) { + this.jobsToRun = jobsToRun; + } + /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) { assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size(); Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); // Put all jobs onto local node. - for (int i = 0; i < subgrid.size(); i++) - map.put(new GridJobStealingJob(2000L), ignite.cluster().localNode()); + for (int i = 0; i < jobsToRun; i++) + map.put(new GridJobStealingJob(3000L), ignite.cluster().localNode()); return map; } /** {@inheritDoc} */ @Override public JobStealingResult reduce(List<ComputeJobResult> results) { - assert results.size() == 2; + int stolen = 0; + int nonStolen = 0; + + Set<String> nodes = new HashSet<>(results.size()); - for (ComputeJobResult res : results) - log.info("Job result: " + res.getData()); + for (ComputeJobResult res : results) { + String data = res.getData(); - Object obj0 = results.get(0).getData(); + log.info("Job result: " + data); - if (obj0.equals(results.get(1).getData())) { - if (obj0.equals(ignite.name())) - return JobStealingResult.NONE_STOLEN; + nodes.add(data); - return JobStealingResult.BOTH_STOLEN; + if (!data.equals(ignite.name())) + stolen++; + else + nonStolen++; } - return JobStealingResult.ONE_STOLEN; + return new JobStealingResult(stolen, nonStolen, nodes); } } @@ -219,6 +297,8 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest /** {@inheritDoc} */ @Override public Serializable execute() { try { + jobExecutedLatch.countDown(); + Long sleep = argument(0); assert sleep != null; @@ -236,14 +316,30 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest /** * Job stealing result. */ - private enum JobStealingResult { + private static class JobStealingResult { + /** */ + int stolen; + /** */ - BOTH_STOLEN, + int nonStolen; /** */ - ONE_STOLEN, + Set nodes; /** */ - NONE_STOLEN + public JobStealingResult(int stolen, int nonStolen, Set nodes) { + this.stolen = stolen; + this.nonStolen = nonStolen; + this.nodes = nodes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "JobStealingResult{" + + "stolen=" + stolen + + ", nonStolen=" + nonStolen + + ", nodes=" + Arrays.toString(nodes.toArray()) + + '}'; + } } } \ No newline at end of file
