IGNITE-2310 Lock cache partition for affinityRun/affinityCall execution
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01800101 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01800101 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01800101 Branch: refs/heads/ignite-3661 Commit: 018001011daff723d120834da7b4f57bad7f8f71 Parents: 00f47d7 Author: tledkov-gridgain <[email protected]> Authored: Fri May 27 15:16:27 2016 +0300 Committer: tledkov-gridgain <[email protected]> Committed: Tue Aug 9 12:50:23 2016 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 6 +- .../java/org/apache/ignite/IgniteCompute.java | 69 +- .../ignite/internal/GridJobExecuteRequest.java | 148 +++- .../ignite/internal/GridJobExecuteResponse.java | 42 +- .../ignite/internal/IgniteComputeImpl.java | 120 ++- .../failover/GridFailoverContextImpl.java | 27 +- .../managers/failover/GridFailoverManager.java | 17 +- .../affinity/GridAffinityProcessor.java | 18 + .../processors/cache/GridCacheSwapManager.java | 22 +- .../distributed/dht/GridDhtCacheAdapter.java | 19 +- .../distributed/dht/GridDhtLocalPartition.java | 33 +- .../cache/distributed/dht/GridReservable.java | 5 +- .../cache/query/GridCacheQueryManager.java | 10 +- .../processors/closure/AffinityTask.java | 17 +- .../closure/GridClosureProcessor.java | 142 +++- .../processors/job/GridJobProcessor.java | 139 ++- .../internal/processors/job/GridJobWorker.java | 203 +++-- .../processors/query/GridQueryProcessor.java | 22 +- .../processors/task/GridTaskWorker.java | 235 +++-- .../ignite/internal/util/IgniteUtils.java | 10 + .../ignite/spi/failover/FailoverContext.java | 15 +- .../spi/failover/always/AlwaysFailoverSpi.java | 15 +- .../GridJobMasterLeaveAwareSelfTest.java | 4 +- ...ectionLocalJobMultipleArgumentsSelfTest.java | 4 +- .../GridTaskFailoverAffinityRunTest.java | 2 +- .../IgniteComputeEmptyClusterGroupTest.java | 8 +- .../binary/GridBinaryAffinityKeySelfTest.java | 6 +- ...acheAbstractUsersAffinityMapperSelfTest.java | 2 +- ...niteDynamicCacheStartStopConcurrentTest.java | 2 +- .../spi/failover/GridFailoverTestContext.java | 6 + ...eLockPartitionOnAffinityRunAbstractTest.java | 412 +++++++++ ...PartitionOnAffinityRunAtomicCacheOpTest.java | 329 +++++++ ...niteCacheLockPartitionOnAffinityRunTest.java | 852 +++++++++++++++++++ ...LockPartitionOnAffinityRunTxCacheOpTest.java | 33 + ...titionOnAffinityRunWithCollisionSpiTest.java | 204 +++++ .../IgniteCacheAffinityRunTestSuite.java | 45 + 36 files changed, 2961 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index a6ae0da..5cfd9c5 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -744,14 +744,14 @@ public class MessageCodeGenerator { else if (type.isArray()) { Class<?> compType = type.getComponentType(); - returnFalseIfReadFailed(name, "reader.readObjectArray", field, setExpr, + returnFalseIfReadFailed(name, "reader.readObjectArray", setExpr, field, "MessageCollectionItemType." + typeEnum(compType), compType.getSimpleName() + ".class"); } else if (Collection.class.isAssignableFrom(type) && !Set.class.isAssignableFrom(type)) { assert colItemType != null; - returnFalseIfReadFailed(name, "reader.readCollection", field, setExpr, + returnFalseIfReadFailed(name, "reader.readCollection", setExpr, field, "MessageCollectionItemType." + typeEnum(colItemType)); } else if (Map.class.isAssignableFrom(type)) { @@ -760,7 +760,7 @@ public class MessageCodeGenerator { boolean linked = type.equals(LinkedHashMap.class); - returnFalseIfReadFailed(name, "reader.readMap", field, setExpr, + returnFalseIfReadFailed(name, "reader.readMap", setExpr, field, "MessageCollectionItemType." + typeEnum(mapKeyType), "MessageCollectionItemType." + typeEnum(mapValType), linked ? "true" : "false"); http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index f7d4bc5..212849a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -38,6 +38,7 @@ import org.apache.ignite.resources.SpringResource; import org.apache.ignite.resources.TaskSessionResource; import org.apache.ignite.spi.failover.FailoverSpi; import org.apache.ignite.spi.loadbalancing.LoadBalancingSpi; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -122,7 +123,8 @@ public interface IgniteCompute extends IgniteAsyncSupport { /** * Executes given job on the node where data for provided affinity key is located - * (a.k.a. affinity co-location). + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. * * @param cacheName Name of the cache to use for affinity co-location. * @param affKey Affinity key. @@ -134,7 +136,38 @@ public interface IgniteCompute extends IgniteAsyncSupport { /** * Executes given job on the node where data for provided affinity key is located - * (a.k.a. affinity co-location). + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. The data + * of the extra caches' partitions with the same partition number also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param affKey Affinity key. + * @param job Job which will be co-located on the node with given affinity key. + * @throws IgniteException If job failed. + */ + @IgniteAsyncSupported + public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job) + throws IgniteException; + + /** + * Executes given job on the node where partition is located (the partition is primary on the node) + * The data of the partition will not be migrated from the target node + * while the job is executed. The data of the extra caches' partitions with the same partition number + * also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param partId Partition number. + * @param job Job which will be co-located on the node with given affinity key. + * @throws IgniteException If job failed. + */ + @IgniteAsyncSupported + public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job) + throws IgniteException; + + /** + * Executes given job on the node where data for provided affinity key is located + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. * * @param cacheName Name of the cache to use for affinity co-location. * @param affKey Affinity key. @@ -146,6 +179,38 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) throws IgniteException; /** + * Executes given job on the node where data for provided affinity key is located + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. The data + * of the extra caches' partitions with the same partition number also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param affKey Affinity key. + * @param job Job which will be co-located on the node with given affinity key. + * @return Job result. + * @throws IgniteException If job failed. + */ + @IgniteAsyncSupported + public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job) + throws IgniteException; + + /** + * Executes given job on the node where partition is located (the partition is primary on the node) + * The data of the partition will not be migrated from the target node + * while the job is executed. The data of the extra caches' partitions with the same partition number + * also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param partId Partition to reserve. + * @param job Job which will be co-located on the node with given affinity key. + * @return Job result. + * @throws IgniteException If job failed. + */ + @IgniteAsyncSupported + public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job) + throws IgniteException; + + /** * Executes given task on within the cluster group. For step-by-step explanation of task execution process * refer to {@link ComputeTask} documentation. * http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index 28b4094..ed431d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobSibling; import org.apache.ignite.configuration.DeploymentMode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -136,6 +137,15 @@ public class GridJobExecuteRequest implements Message { @GridDirectCollection(UUID.class) private Collection<UUID> top; + /** */ + private int[] idsOfCaches; + + /** */ + private int part; + + /** */ + private AffinityTopologyVersion topVer; + /** * No-op constructor to support {@link Externalizable} interface. */ @@ -169,6 +179,9 @@ public class GridJobExecuteRequest implements Message { * @param sesFullSup {@code True} if session attributes are disabled. * @param internal {@code True} if internal job. * @param subjId Subject ID. + * @param cacheIds Caches' identifiers to reserve partition. + * @param part Partition to lock. + * @param topVer Affinity topology version of job mapping. */ public GridJobExecuteRequest( IgniteUuid sesId, @@ -195,7 +208,10 @@ public class GridJobExecuteRequest implements Message { boolean forceLocDep, boolean sesFullSup, boolean internal, - UUID subjId) { + UUID subjId, + @Nullable int[] cacheIds, + int part, + @Nullable AffinityTopologyVersion topVer) { this.top = top; assert sesId != null; assert jobId != null; @@ -232,6 +248,9 @@ public class GridJobExecuteRequest implements Message { this.sesFullSup = sesFullSup; this.internal = internal; this.subjId = subjId; + this.idsOfCaches = cacheIds; + this.part = part; + this.topVer = topVer; this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi; } @@ -421,6 +440,27 @@ public class GridJobExecuteRequest implements Message { return subjId; } + /** + * @return Caches' identifiers to reserve specified partition for job execution. + */ + public int[] getCacheIds() { + return idsOfCaches; + } + + /** + * @return Partitions to lock for job execution. + */ + public int getPartition() { + return part; + } + + /** + * @return Affinity version which was used to map job + */ + public AffinityTopologyVersion getTopVer() { + return topVer; + } + /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. @@ -469,96 +509,114 @@ public class GridJobExecuteRequest implements Message { writer.incrementState(); case 5: - if (!writer.writeBoolean("internal", internal)) + if (!writer.writeIntArray("idsOfCaches", idsOfCaches)) return false; writer.incrementState(); case 6: - if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes)) + if (!writer.writeBoolean("internal", internal)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("jobBytes", jobBytes)) + if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes)) return false; writer.incrementState(); case 8: - if (!writer.writeIgniteUuid("jobId", jobId)) + if (!writer.writeByteArray("jobBytes", jobBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) + if (!writer.writeIgniteUuid("jobId", jobId)) return false; writer.incrementState(); case 10: - if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes)) + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("sesFullSup", sesFullSup)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 12: - if (!writer.writeIgniteUuid("sesId", sesId)) + if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes)) return false; writer.incrementState(); case 13: - if (!writer.writeByteArray("siblingsBytes", siblingsBytes)) + if (!writer.writeBoolean("sesFullSup", sesFullSup)) return false; writer.incrementState(); case 14: - if (!writer.writeLong("startTaskTime", startTaskTime)) + if (!writer.writeIgniteUuid("sesId", sesId)) return false; writer.incrementState(); case 15: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeByteArray("siblingsBytes", siblingsBytes)) return false; writer.incrementState(); case 16: - if (!writer.writeString("taskClsName", taskClsName)) + if (!writer.writeLong("startTaskTime", startTaskTime)) return false; writer.incrementState(); case 17: - if (!writer.writeString("taskName", taskName)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 18: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeString("taskClsName", taskClsName)) return false; writer.incrementState(); case 19: - if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID)) + if (!writer.writeString("taskName", taskName)) return false; writer.incrementState(); case 20: + if (!writer.writeLong("timeout", timeout)) + return false; + + writer.incrementState(); + + case 21: + if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID)) + return false; + + writer.incrementState(); + + case 22: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 23: if (!writer.writeString("userVer", userVer)) return false; @@ -622,7 +680,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 5: - internal = reader.readBoolean("internal"); + idsOfCaches = reader.readIntArray("idsOfCaches"); if (!reader.isLastRead()) return false; @@ -630,7 +688,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 6: - jobAttrsBytes = reader.readByteArray("jobAttrsBytes"); + internal = reader.readBoolean("internal"); if (!reader.isLastRead()) return false; @@ -638,7 +696,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 7: - jobBytes = reader.readByteArray("jobBytes"); + jobAttrsBytes = reader.readByteArray("jobAttrsBytes"); if (!reader.isLastRead()) return false; @@ -646,7 +704,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 8: - jobId = reader.readIgniteUuid("jobId"); + jobBytes = reader.readByteArray("jobBytes"); if (!reader.isLastRead()) return false; @@ -654,7 +712,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 9: - ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); + jobId = reader.readIgniteUuid("jobId"); if (!reader.isLastRead()) return false; @@ -662,7 +720,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 10: - sesAttrsBytes = reader.readByteArray("sesAttrsBytes"); + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); if (!reader.isLastRead()) return false; @@ -670,7 +728,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 11: - sesFullSup = reader.readBoolean("sesFullSup"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -678,7 +736,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 12: - sesId = reader.readIgniteUuid("sesId"); + sesAttrsBytes = reader.readByteArray("sesAttrsBytes"); if (!reader.isLastRead()) return false; @@ -686,7 +744,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 13: - siblingsBytes = reader.readByteArray("siblingsBytes"); + sesFullSup = reader.readBoolean("sesFullSup"); if (!reader.isLastRead()) return false; @@ -694,7 +752,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 14: - startTaskTime = reader.readLong("startTaskTime"); + sesId = reader.readIgniteUuid("sesId"); if (!reader.isLastRead()) return false; @@ -702,7 +760,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 15: - subjId = reader.readUuid("subjId"); + siblingsBytes = reader.readByteArray("siblingsBytes"); if (!reader.isLastRead()) return false; @@ -710,7 +768,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 16: - taskClsName = reader.readString("taskClsName"); + startTaskTime = reader.readLong("startTaskTime"); if (!reader.isLastRead()) return false; @@ -718,7 +776,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 17: - taskName = reader.readString("taskName"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -726,7 +784,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 18: - timeout = reader.readLong("timeout"); + taskClsName = reader.readString("taskClsName"); if (!reader.isLastRead()) return false; @@ -734,7 +792,7 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 19: - top = reader.readCollection("top", MessageCollectionItemType.UUID); + taskName = reader.readString("taskName"); if (!reader.isLastRead()) return false; @@ -742,6 +800,30 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); case 20: + timeout = reader.readLong("timeout"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 21: + top = reader.readCollection("top", MessageCollectionItemType.UUID); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: userVer = reader.readString("userVer"); if (!reader.isLastRead()) @@ -761,11 +843,11 @@ public class GridJobExecuteRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 21; + return 24; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridJobExecuteRequest.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java index bfbd859..9724bc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -75,6 +76,9 @@ public class GridJobExecuteResponse implements Message { @GridDirectTransient private IgniteException fakeEx; + /** */ + private AffinityTopologyVersion retry; + /** * No-op constructor to support {@link Externalizable} interface. This * constructor is not meant to be used for other purposes. @@ -94,6 +98,7 @@ public class GridJobExecuteResponse implements Message { * @param jobAttrsBytes Serialized job attributes. * @param jobAttrs Job attributes. * @param isCancelled Whether job was cancelled or not. + * @param retry Topology version for that partitions haven't been reserved on the affinity node. */ public GridJobExecuteResponse(UUID nodeId, IgniteUuid sesId, @@ -104,7 +109,8 @@ public class GridJobExecuteResponse implements Message { Object res, byte[] jobAttrsBytes, Map<Object, Object> jobAttrs, - boolean isCancelled) + boolean isCancelled, + AffinityTopologyVersion retry) { assert nodeId != null; assert sesId != null; @@ -120,6 +126,7 @@ public class GridJobExecuteResponse implements Message { this.jobAttrsBytes = jobAttrsBytes; this.jobAttrs = jobAttrs; this.isCancelled = isCancelled; + this.retry = retry; } /** @@ -206,6 +213,21 @@ public class GridJobExecuteResponse implements Message { this.fakeEx = fakeEx; } + /** + * @return {@code True} if need retry job. + */ + public boolean retry() { + return retry != null; + } + + /** + * @return Topology version for that specified partitions haven't been reserved + * on the affinity node. + */ + public AffinityTopologyVersion getRetryTopologyVersion() { + return retry != null ? retry : AffinityTopologyVersion.NONE; + } + /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. @@ -260,6 +282,12 @@ public class GridJobExecuteResponse implements Message { writer.incrementState(); case 6: + if (!writer.writeMessage("retry", retry)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeIgniteUuid("sesId", sesId)) return false; @@ -327,6 +355,14 @@ public class GridJobExecuteResponse implements Message { reader.incrementState(); case 6: + retry = reader.readMessage("retry"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: sesId = reader.readIgniteUuid("sesId"); if (!reader.isLastRead()) @@ -346,11 +382,11 @@ public class GridJobExecuteResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridJobExecuteResponse.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index 15ad15f..26c6797 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -35,6 +35,7 @@ import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; @@ -43,6 +44,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.GridClosureCallMode.BALANCE; @@ -109,7 +111,64 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes())); + // In case cache key is passed instead of affinity key. + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + int partId = ctx.affinity().partition(cacheName, affKey0); + + if (partId < 0) + throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + + affKey + ']'); + + saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, affKey, + job, prj.nodes())); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job) { + A.notNull(affKey, "affKey"); + A.notNull(job, "job"); + A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); + + guard(); + + try { + final String cacheName = F.first(cacheNames); + + // In case cache key is passed instead of affinity key. + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + int partId = ctx.affinity().partition(cacheName, affKey0); + + if (partId < 0) + throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + + affKey + ']'); + + saveOrGet(ctx.closure().affinityRun(cacheNames, partId, affKey, job, prj.nodes())); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job) { + A.ensure(partId >= 0, "partId = " + partId); + A.notNull(job, "job"); + A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); + + guard(); + + try { + saveOrGet(ctx.closure().affinityRun(cacheNames, partId, null, job, prj.nodes())); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -127,7 +186,64 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return saveOrGet(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes())); + // In case cache key is passed instead of affinity key. + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + int partId = ctx.affinity().partition(cacheName, affKey0); + + if (partId < 0) + throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + + affKey + ']'); + + return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, affKey, job, + prj.nodes())); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job) { + A.notNull(affKey, "affKey"); + A.notNull(job, "job"); + A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); + + guard(); + + try { + final String cacheName = F.first(cacheNames); + + // In case cache key is passed instead of affinity key. + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + int partId = ctx.affinity().partition(cacheName, affKey0); + + if (partId < 0) + throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + + affKey + ']'); + + return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, affKey, job, prj.nodes())); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job) { + A.ensure(partId >= 0, "partId = " + partId); + A.notNull(job, "job"); + A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); + + guard(); + + try { + return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, null, job, prj.nodes())); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java index 3985df7..ad77271 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java @@ -23,6 +23,7 @@ import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeTaskSession; import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.failover.FailoverContext; @@ -42,26 +43,36 @@ public class GridFailoverContextImpl implements FailoverContext { @GridToStringExclude private final GridLoadBalancerManager loadMgr; + /** Partition key for affinityCall. */ + private final int partId; + /** Affinity key for affinityCall. */ private final Object affKey; /** Affinity cache name for affinityCall. */ private final String affCacheName; + /** Affinity topology version. */ + private final AffinityTopologyVersion topVer; + /** * Initializes failover context. * * @param taskSes Grid task session. * @param jobRes Failed job result. * @param loadMgr Load manager. + * @param partId Partition. * @param affKey Affinity key. * @param affCacheName Affinity cache name. + * @param topVer Affinity topology version. */ public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, GridLoadBalancerManager loadMgr, + int partId, @Nullable Object affKey, - @Nullable String affCacheName) { + @Nullable String affCacheName, + @Nullable AffinityTopologyVersion topVer) { assert taskSes != null; assert jobRes != null; assert loadMgr != null; @@ -69,8 +80,10 @@ public class GridFailoverContextImpl implements FailoverContext { this.taskSes = taskSes; this.jobRes = jobRes; this.loadMgr = loadMgr; + this.partId = partId; this.affKey = affKey; this.affCacheName = affCacheName; + this.topVer = topVer; } /** {@inheritDoc} */ @@ -99,6 +112,18 @@ public class GridFailoverContextImpl implements FailoverContext { } /** {@inheritDoc} */ + public int partition() { + return partId; + } + + /** + * @return Affinity topology version. + */ + @Nullable public AffinityTopologyVersion affinityTopologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFailoverContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java index fa22b62..52edd1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java @@ -24,6 +24,7 @@ import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.managers.GridManagerAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.spi.failover.FailoverSpi; import org.jetbrains.annotations.Nullable; @@ -58,16 +59,26 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> { * @param taskSes Task session. * @param jobRes Job result. * @param top Collection of all topology nodes. + * @param affPartId Partition number. * @param affKey Affinity key. * @param affCacheName Affinity cache name. + * @param topVer Affinity topology version. * @return New node to route this job to. */ public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top, + int affPartId, @Nullable Object affKey, - @Nullable String affCacheName) { - return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes, - ctx.loadBalancing(), affKey, affCacheName), top); + @Nullable String affCacheName, + @Nullable AffinityTopologyVersion topVer) { + return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, + jobRes, + ctx.loadBalancing(), + affPartId, + affKey, + affCacheName, + topVer), + top); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 19e0842..1726d02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -195,6 +195,24 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * Maps partition to a node. + * + * @param cacheName Cache name. + * @param partId partition. + * @param topVer Affinity topology version. + * @return Picked node. + * @throws IgniteCheckedException If failed. + */ + @Nullable public ClusterNode mapPartitionToNode(@Nullable String cacheName, int partId, + AffinityTopologyVersion topVer) + throws IgniteCheckedException { + AffinityInfo affInfo = affinityCache(cacheName, topVer); + + return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null; + } + + + /** * Maps keys to nodes for given cache. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index cc3261c..fd0b471 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1997,6 +1997,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup) throws IgniteCheckedException { + return rawSwapIterator(primary, backup, cctx.affinity().affinityTopologyVersion()); + } + + /** + * @return Raw off-heap iterator. + * @param primary Include primaries. + * @param backup Include backups. + * @param topVer Affinity topology version. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup, + AffinityTopologyVersion topVer) + throws IgniteCheckedException + { + assert topVer != null; + if (!swapEnabled || (!primary && !backup)) return new GridEmptyCloseableIterator<>(); @@ -2005,10 +2021,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (primary && backup) return swapMgr.rawIterator(spaceName); - AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : + cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(parts) { @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part) http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 14468eb..35e6267 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1229,13 +1229,28 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, final boolean backup, final boolean keepBinary) { + return localEntriesIterator(primary, + backup, + keepBinary, + ctx.affinity().affinityTopologyVersion()); + } + + /** + * @param primary If {@code true} includes primary entries. + * @param backup If {@code true} includes backup entries. + * @param keepBinary Keep binary flag. + * @param topVer Specified affinity topology version. + * @return Local entries iterator. + */ + public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, + final boolean backup, + final boolean keepBinary, + final AffinityTopologyVersion topVer) { assert primary || backup; if (primary && backup) return iterator(entries().iterator(), !keepBinary); else { - final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator(); Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 5061136..39a3e08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -117,6 +117,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** Update counter. */ private final AtomicLong cntr = new AtomicLong(); + /** Set if failed to move partition to RENTING state due to reservations, to be checked when + * reservation is released. */ + private volatile boolean shouldBeRenting; + /** * @param cctx Context. * @param id Partition ID. @@ -411,6 +415,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, // Decrement reservations. if (state.compareAndSet(reservations, --reservations)) { + if ((reservations & 0xFFFF) == 0 && shouldBeRenting) + rent(true); + tryEvict(); break; @@ -461,24 +468,24 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return Future to signal that this node is no longer an owner or backup. */ IgniteInternalFuture<?> rent(boolean updateSeq) { - while (true) { - long reservations = state.get(); + long reservations = state.get(); - int ord = (int)(reservations >> 32); + int ord = (int)(reservations >> 32); - if (ord == RENTING.ordinal() || ord == EVICTED.ordinal()) - return rent; + if (ord == RENTING.ordinal() || ord == EVICTED.ordinal()) + return rent; - if (casState(reservations, RENTING)) { - if (log.isDebugEnabled()) - log.debug("Moved partition to RENTING state: " + this); + shouldBeRenting = true; - // Evict asynchronously, as the 'rent' method may be called - // from within write locks on local partition. - tryEvictAsync(updateSeq); + if ((reservations & 0xFFFF) == 0 && casState(reservations, RENTING)) { + shouldBeRenting = false; - break; - } + if (log.isDebugEnabled()) + log.debug("Moved partition to RENTING state: " + this); + + // Evict asynchronously, as the 'rent' method may be called + // from within write locks on local partition. + tryEvictAsync(updateSeq); } return rent; http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java index 51f22bc..068c68d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import org.apache.ignite.IgniteCheckedException; + /** * Reservations support. */ @@ -25,8 +27,9 @@ public interface GridReservable { * Reserves. * * @return {@code true} If reserved successfully. + * @throws IgniteCheckedException If failed. */ - public boolean reserve(); + public boolean reserve() throws IgniteCheckedException; /** * Releases. http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 6729d41..163bac5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -846,7 +846,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final ExpiryPolicy plc = cctx.expiry(); - final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion(); + + if (topVer == null) + topVer = cctx.affinity().affinityTopologyVersion(); final boolean backups = qry.includeBackups() || cctx.isReplicated(); @@ -935,7 +938,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Integer part = qry.partition(); - Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) : + Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups, topVer) : cctx.swap().rawSwapIterator(part); if (expPlc != null) @@ -978,8 +981,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht(); final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true, - backups, - cache.context().keepBinary()); + backups, cache.context().keepBinary(), topVer); return new GridIteratorAdapter<IgniteBiTuple<K, V>>() { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java index da00f01..9007c8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.closure; +import java.util.Collection; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; /** @@ -26,10 +28,21 @@ public interface AffinityTask { /** * @return Affinity key. */ - public Object affinityKey(); + @Deprecated + @Nullable public Object affinityKey(); + + /** + * @return Partition. + */ + public int partition(); /** * @return Affinity cache name. */ - @Nullable public String affinityCacheName(); + @Nullable public Collection<String> affinityCacheNames(); + + /** + * @return Affinity topology version. + */ + @Nullable public AffinityTopologyVersion topologyVersion(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index f9b74c4..6f878ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.GridInternalWrapper; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -439,34 +440,38 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * @param cacheName Cache name. + * @param cacheNames Cache names. + * @param partId Partition. * @param affKey Affinity key. - * @param job Job. + * @param job Closure to execute. * @param nodes Grid nodes. - * @return Job future. + * @return Grid future for collection of closure results. + * @throws IgniteCheckedException If failed. */ - public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, - @Nullable Collection<ClusterNode> nodes) { + public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames, + int partId, + @Nullable Object affKey, + Callable<R> job, + @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException { + assert partId >= 0 : partId; + busyLock.readLock(); try { if (F.isEmpty(nodes)) return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException()); - // In case cache key is passed instead of affinity key. - final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + final String cacheName = F.first(cacheNames); - final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); + final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx(); + final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer); if (node == null) return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false); - } - catch (IgniteCheckedException e) { - return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); + return ctx.task().execute(new T5(node, job, cacheNames, partId, affKey, mapTopVer), null, false); } finally { busyLock.readUnlock(); @@ -474,34 +479,38 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * @param cacheName Cache name. + * @param cacheNames Cache names. + * @param partId Partition. * @param affKey Affinity key. * @param job Job. * @param nodes Grid nodes. * @return Job future. + * @throws IgniteCheckedException If failed. */ - public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, - @Nullable Collection<ClusterNode> nodes) { + public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames, + int partId, + @Nullable Object affKey, + Runnable job, + @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException { + assert partId >= 0 : partId; + busyLock.readLock(); try { if (F.isEmpty(nodes)) return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException()); - // In case cache key is passed instead of affinity key. - final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + final String cacheName = F.first(cacheNames); - final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); + final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx(); + final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer); if (node == null) return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false); - } - catch (IgniteCheckedException e) { - return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); + return ctx.task().execute(new T4(node, job, cacheNames, partId, affKey, mapTopVer), null, false); } finally { busyLock.readUnlock(); @@ -1183,7 +1192,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * + * @return Map. */ public Map<ComputeJob, ClusterNode> map() { return map; @@ -1346,23 +1355,35 @@ public class GridClosureProcessor extends GridProcessorAdapter { private Object affKey; /** */ - private String affCacheName; + private int partId; + + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private Collection<String> affCacheNames; + /** * @param node Cluster node. - * @param job Job. + * @param job Job affinity partition. + * @param affCacheNames Affinity caches. + * @param partId Partition. * @param affKey Affinity key. - * @param affCacheName Affinity cache name. + * @param topVer Affinity topology version. */ - private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) { + private T4(ClusterNode node, Runnable job, Collection<String> affCacheNames, int partId, Object affKey, + AffinityTopologyVersion topVer) { super(U.peerDeployAware0(job)); - assert affKey != null; + assert partId >= 0; this.node = node; this.job = job; + this.affCacheNames = affCacheNames; + this.partId = partId; this.affKey = affKey; - this.affCacheName = affCacheName; + this.topVer = topVer; } /** {@inheritDoc} */ @@ -1371,13 +1392,23 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public Object affinityKey() { - return affKey; + @Override public int partition() { + return partId; } /** {@inheritDoc} */ - @Nullable @Override public String affinityCacheName() { - return affCacheName; + @Nullable @Override public Collection<String> affinityCacheNames() { + return affCacheNames; + } + + /** {@inheritDoc} */ + @Nullable @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object affinityKey() { + return affKey; } } @@ -1398,23 +1429,38 @@ public class GridClosureProcessor extends GridProcessorAdapter { private Object affKey; /** */ - private String affCacheName; + private int partId; + + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private Collection<String> affCacheNames; + + /** * @param node Cluster node. - * @param job Job. + * @param job Job affinity partition. + * @param affCacheNames Affinity caches. + * @param partId Partition. * @param affKey Affinity key. - * @param affCacheName Affinity cache name. + * @param topVer Affinity topology version. */ - private T5(ClusterNode node, Callable<R> job, Object affKey, String affCacheName) { + private T5(ClusterNode node, + Callable<R> job, + Collection<String> affCacheNames, + int partId, + Object affKey, + AffinityTopologyVersion topVer) { super(U.peerDeployAware0(job)); - assert affKey != null; - this.node = node; this.job = job; + this.affCacheNames = affCacheNames; + this.partId = partId; this.affKey = affKey; - this.affCacheName = affCacheName; + this.topVer = topVer; } /** {@inheritDoc} */ @@ -1433,13 +1479,23 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public Object affinityKey() { + @Nullable @Override public Object affinityKey() { return affKey; } /** {@inheritDoc} */ - @Nullable @Override public String affinityCacheName() { - return affCacheName; + @Override public int partition() { + return partId; + } + + /** {@inheritDoc} */ + @Nullable @Override public Collection<String> affinityCacheNames() { + return affCacheNames; + } + + /** {@inheritDoc} */ + @Nullable @Override public AffinityTopologyVersion topologyVersion() { + return topVer; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index a2e9e33..6a162d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -60,12 +60,17 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; @@ -93,6 +98,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS; import static org.apache.ignite.internal.GridTopic.TOPIC_TASK; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; /** @@ -420,7 +426,8 @@ public class GridJobProcessor extends GridProcessorAdapter { * @return Siblings. * @throws IgniteCheckedException If failed. */ - public Collection<ComputeJobSibling> requestJobSiblings(final ComputeTaskSession ses) throws IgniteCheckedException { + public Collection<ComputeJobSibling> requestJobSiblings( + final ComputeTaskSession ses) throws IgniteCheckedException { assert ses != null; final UUID taskNodeId = ses.getTaskNodeId(); @@ -628,7 +635,7 @@ public class GridJobProcessor extends GridProcessorAdapter { GridJobWorker activeJob = activeJobs.get(jobId); if (activeJob != null && idsMatch.apply(activeJob)) - cancelActiveJob(activeJob, sys); + cancelActiveJob(activeJob, sys); } } finally { @@ -743,7 +750,7 @@ public class GridJobProcessor extends GridProcessorAdapter { void advance() { assert w == null; - while(iter.hasNext()) { + while (iter.hasNext()) { GridJobWorker w0 = iter.next(); assert !w0.isInternal(); @@ -804,7 +811,7 @@ public class GridJobProcessor extends GridProcessorAdapter { void advance() { assert w == null; - while(iter.hasNext()) { + while (iter.hasNext()) { GridJobWorker w0 = iter.next(); assert !w0.isInternal(); @@ -947,6 +954,15 @@ public class GridJobProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Received job request message [req=" + req + ", nodeId=" + node.id() + ']'); + PartitionsReservation partsReservation = null; + + if (req.getCacheIds() != null) { + assert req.getPartition() >= 0 : req; + assert !F.isEmpty(req.getCacheIds()) : req; + + partsReservation = new PartitionsReservation(req.getCacheIds(), req.getPartition(), req.getTopVer()); + } + GridJobWorker job = null; if (!rwLock.tryReadLock()) { @@ -1079,7 +1095,9 @@ public class GridJobProcessor extends GridProcessorAdapter { node, req.isInternal(), evtLsnr, - holdLsnr); + holdLsnr, + partsReservation, + req.getTopVer()); jobCtx.job(job); @@ -1330,7 +1348,8 @@ public class GridJobProcessor extends GridProcessorAdapter { null, loc ? null : marsh.marshal(null), null, - false); + false, + null); if (req.isSessionFullSupport()) { // Send response to designated job topic. @@ -1472,6 +1491,114 @@ public class GridJobProcessor extends GridProcessorAdapter { /** * */ + private class PartitionsReservation implements GridReservable { + /** Caches. */ + private final int[] cacheIds; + + /** Partition id. */ + private final int partId; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** Partitions. */ + private GridDhtLocalPartition[] partititons; + + /** + * @param cacheIds Cache identifiers array. + * @param partId Partition number. + * @param topVer Affinity topology version. + */ + public PartitionsReservation(int[] cacheIds, int partId, + AffinityTopologyVersion topVer) { + this.cacheIds = cacheIds; + this.partId = partId; + this.topVer = topVer; + partititons = new GridDhtLocalPartition[cacheIds.length]; + } + + /** {@inheritDoc} */ + @Override public boolean reserve() throws IgniteCheckedException { + boolean reserved = false; + + try { + for (int i = 0; i < cacheIds.length; ++i) { + GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds[i]); + + if (cctx == null) // Cache was not found, probably was not deployed yet. + return reserved; + + if (!cctx.started()) // Cache not started. + return reserved; + + if (cctx.isLocal() || !cctx.rebalanceEnabled()) + continue; + + boolean checkPartMapping = false; + + try { + if (cctx.isReplicated()) { + GridDhtLocalPartition part = cctx.topology().localPartition(partId, + topVer, false); + + // We don't need to reserve partitions because they will not be evicted in replicated caches. + if (part == null || part.state() != OWNING) { + checkPartMapping = true; + + return reserved; + } + } + + GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); + + if (part == null || part.state() != OWNING || !part.reserve()) { + checkPartMapping = true; + + return reserved; + } + + partititons[i] = part; + + // Double check that we are still in owning state and partition contents are not cleared. + if (part.state() != OWNING) { + checkPartMapping = true; + + return reserved; + } + } + finally { + if (checkPartMapping && !cctx.affinity().primary(partId, topVer).id().equals(ctx.localNodeId())) + throw new IgniteCheckedException("Failed partition reservation. " + + "Partition is not primary on the node. [partition=" + partId + ", cacheName=" + cctx.name() + + ", nodeId=" + ctx.localNodeId() + ", topology=" + topVer + ']'); + } + } + + reserved = true; + } + finally { + if (!reserved) + release(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public void release() { + for (int i = 0; i < partititons.length; ++i) { + if (partititons[i] == null) + break; + + partititons[i].release(); + partititons[i] = null; + } + } + } + + /** + * + */ private class CollisionJobContext extends GridCollisionJobContextAdapter { /** */ private final boolean passive; http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 5b04d6f..16fadaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -43,6 +43,9 @@ import org.apache.ignite.internal.GridJobSessionImpl; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.typedef.F; @@ -154,6 +157,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { /** Hold/unhold listener to notify job processor. */ private final GridJobHoldListener holdLsnr; + /** Partitions to reservations. */ + private final GridReservable partsReservation; + + /** Request topology version. */ + private final AffinityTopologyVersion reqTopVer; + /** * @param ctx Kernal context. * @param dep Grid deployment. @@ -166,6 +175,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { * @param internal Whether or not task was marked with {@link GridInternal} * @param evtLsnr Job event listener. * @param holdLsnr Hold listener. + * @param partsReservation Reserved partitions (must be released at the job finish). + * @param reqTopVer Affinity topology version of the job request. */ GridJobWorker( GridKernalContext ctx, @@ -178,7 +189,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { ClusterNode taskNode, boolean internal, GridJobEventListener evtLsnr, - GridJobHoldListener holdLsnr) { + GridJobHoldListener holdLsnr, + GridReservable partsReservation, + AffinityTopologyVersion reqTopVer) { super(ctx.gridName(), "grid-job-worker", ctx.log(GridJobWorker.class)); assert ctx != null; @@ -199,6 +212,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { this.taskNode = taskNode; this.internal = internal; this.holdLsnr = holdLsnr; + this.partsReservation = partsReservation; + this.reqTopVer = reqTopVer; if (job != null) this.job = job; @@ -471,96 +486,128 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { // Make sure flag is not set for current thread. HOLD.set(false); - if (isCancelled()) - // If job was cancelled prior to assigning runner to it? - super.cancel(); + try { + if (partsReservation != null) { + try { + if (!partsReservation.reserve()) { + finishJob(null, null, true, true); - if (!skipNtf) { - if (holdLsnr.onUnheld(this)) - held.decrementAndGet(); - else { - if (log.isDebugEnabled()) - log.debug("Ignoring job execution (job was not held)."); + return; + } + } + catch (Exception e) { + IgniteException ex = new IgniteException("Failed to lock partitions " + + "[jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); - return; + U.error(log, "Failed to lock partitions [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);; + + finishJob(null, ex, true); + + return; + } } - } - boolean sndRes = true; + if (isCancelled()) + // If job was cancelled prior to assigning runner to it? + super.cancel(); - Object res = null; + if (!skipNtf) { + if (holdLsnr.onUnheld(this)) + held.decrementAndGet(); + else { + if (log.isDebugEnabled()) + log.debug("Ignoring job execution (job was not held)."); - IgniteException ex = null; + return; + } + } - try { - ctx.job().currentTaskSession(ses); - - // If job has timed out, then - // avoid computation altogether. - if (isTimedOut()) - sndRes = false; - else { - res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() { - @Nullable @Override public Object call() { - try { - if (internal && ctx.config().isPeerClassLoadingEnabled()) - ctx.job().internal(true); + boolean sndRes = true; - return job.execute(); - } - finally { - if (internal && ctx.config().isPeerClassLoadingEnabled()) - ctx.job().internal(false); + Object res = null; + + IgniteException ex = null; + + try { + ctx.job().currentTaskSession(ses); + + if (reqTopVer != null) + GridQueryProcessor.setRequestAffinityTopologyVersion(reqTopVer); + + // If job has timed out, then + // avoid computation altogether. + if (isTimedOut()) + sndRes = false; + else { + res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() { + @Nullable @Override public Object call() { + try { + if (internal && ctx.config().isPeerClassLoadingEnabled()) + ctx.job().internal(true); + + return job.execute(); + } + finally { + if (internal && ctx.config().isPeerClassLoadingEnabled()) + ctx.job().internal(false); + } } - } - }); + }); - if (log.isDebugEnabled()) - log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']'); + if (log.isDebugEnabled()) + log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']'); + } } - } - catch (IgniteException e) { - if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { - ex = handleThrowable(e); + catch (IgniteException e) { + if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { + ex = handleThrowable(e); - assert ex != null; - } - else { - if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) { - // Print exception for internal errors only if debug is enabled. - if (log.isDebugEnabled()) - U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + assert ex != null; } - else if (X.hasCause(e, InterruptedException.class)) { - String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']'; + else { + if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) { + // Print exception for internal errors only if debug is enabled. + if (log.isDebugEnabled()) + U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + } + else if (X.hasCause(e, InterruptedException.class)) { + String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']'; - if (log.isDebugEnabled()) - U.error(log, msg, e); + if (log.isDebugEnabled()) + U.error(log, msg, e); + else + U.warn(log, msg); + } else - U.warn(log, msg); + U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + + ex = e; } - else - U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + } + // Catch Throwable to protect against bad user code except + // InterruptedException if job is being cancelled. + catch (Throwable e) { + ex = handleThrowable(e); - ex = e; + assert ex != null; + + if (e instanceof Error) + throw (Error)e; } - } - // Catch Throwable to protect against bad user code except - // InterruptedException if job is being cancelled. - catch (Throwable e) { - ex = handleThrowable(e); + finally { + // Finish here only if not held by this thread. + if (!HOLD.get()) + finishJob(res, ex, sndRes); - assert ex != null; + ctx.job().currentTaskSession(null); - if (e instanceof Error) - throw (Error)e; + if (reqTopVer != null) + GridQueryProcessor.setRequestAffinityTopologyVersion(null); + } } finally { - // Finish here only if not held by this thread. - if (!HOLD.get()) - finishJob(res, ex, sndRes); - - ctx.job().currentTaskSession(null); + if (partsReservation != null) + partsReservation.release(); } } @@ -686,7 +733,20 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { */ void finishJob(@Nullable Object res, @Nullable IgniteException ex, - boolean sndReply) + boolean sndReply) { + finishJob(res, ex, sndReply, false); + } + + /** + * @param res Resuilt. + * @param ex Exception + * @param sndReply If {@code true}, reply will be sent. + * @param retry If {@code true}, retry response will be sent. + */ + void finishJob(@Nullable Object res, + @Nullable IgniteException ex, + boolean sndReply, + boolean retry) { // Avoid finishing a job more than once from different threads. if (!finishing.compareAndSet(false, true)) @@ -750,7 +810,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { loc ? res : null, loc ? null : marsh.marshal(attrs), loc ? attrs : null, - isCancelled()); + isCancelled(), + retry ? ctx.cache().context().exchange().readyAffinityVersion() : null); long timeout = ses.getEndTime() - U.currentTimeMillis();
