This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a27d21bff3c IGNITE-22349 .NET: Add compute task session support
(#11373)
a27d21bff3c is described below
commit a27d21bff3cd5563ab5e3f55cae52df76a917cc8
Author: kukushal <[email protected]>
AuthorDate: Wed Jun 26 11:09:33 2024 +0300
IGNITE-22349 .NET: Add compute task session support (#11373)
Add compute task session support to .NET. This allows the user to change
the priority of a compute job via `grid.task.priority` attribute (see
[docs](https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering)).
* The new `IComputeTaskSession` interface supports the `SetAttributes` and
`GetAttribute` operations with the same
signature as their Java `ComputeTaskSession` counterparts.
* The `ComputeTaskSession` implementation delegates operations to the
Java side via `PlatformJniTarget`, which
receives the unmanaged session pointer in the JNI `ComputeTaskMap` and
`ComputeJobExecute` callbacks.
* On the Java side the new `PlatformComputeTaskSession` class receives
the JNI calls and delegates operations to
wrapped `ComputeTaskSession`.
* The `ComputeTaskSession` on the Java side in injected into
`PlatformFullJob` or `PlatformFullTask`. At first glance
the injection might seem slow. However injectors are cached so subsequent
injections would be as fast as setting the
field directly. The alternative approach to pass `ComputeTaskSession`
directly to the `PlatformFullJob` and
`PlatformFullTask` constructors seems to required lots of code changes.
* The new .NET `ComputeTaskSessionFullSupportAttribute` explicitly enables
the compute session attributes API in the
same way the Java's
[ComputeTaskSessionFullSupport](https://github.com/apache/ignite/blob/2.16.0/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSessionFullSupport.java)
annotation does. This flag is passed to Java as the `Execute` JNI operation
parameter and stored in the `PlatformFullTask`.
* The new .NET `TaskSessionResourceAttribute` injects the
`IComputeTaskSession` into implementations of `IComputeTask` and `IComputeJob`.
* Added automated tests for distributing session attributes on the same and
different nodes.
* Documentation: added new "C#/.NET" tab in the `MyUrgentTask` code snippet
in the [Priority
Ordering](https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering)
section.
---
docs/_docs/code-snippets/dotnet/JobScheduling.cs | 76 ++++++++++++
.../distributed-computing/job-scheduling.adoc | 12 +-
.../platform/callback/PlatformCallbackGateway.java | 17 ++-
.../platform/compute/PlatformAbstractJob.java | 6 +-
.../platform/compute/PlatformClosureJob.java | 4 +-
.../platform/compute/PlatformCompute.java | 4 +-
.../compute/PlatformComputeTaskSession.java | 85 ++++++++++++++
.../platform/compute/PlatformFullJob.java | 16 ++-
.../platform/compute/PlatformFullTask.java | 32 +++++-
.../processors/task/GridTaskProcessor.java | 6 +-
.../cpp/core/src/impl/ignite_environment.cpp | 18 +--
.../Compute/ComputeTaskSessionTest.cs | 127 +++++++++++++++++++++
.../Compute/IComputeTaskSession.cs | 39 +++++++
.../Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs | 1 +
.../Apache.Ignite.Core/Impl/Compute/ComputeJob.cs | 10 ++
.../Impl/Compute/ComputeJobHolder.cs | 12 +-
.../Impl/Compute/ComputeRunner.cs | 16 ++-
.../Impl/Compute/ComputeTaskHolder.cs | 22 +++-
.../Impl/Compute/ComputeTaskSession.cs | 53 +++++++++
.../ComputeTaskSessionFullSupportAttribute.cs | 36 ++++++
.../Impl/Resource/ResourceProcessor.cs | 17 +++
.../Impl/Resource/ResourceTypeDescriptor.cs | 41 ++++++-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 32 ++++--
.../Resource/TaskSessionResourceAttribute.cs | 33 ++++++
24 files changed, 661 insertions(+), 54 deletions(-)
diff --git a/docs/_docs/code-snippets/dotnet/JobScheduling.cs
b/docs/_docs/code-snippets/dotnet/JobScheduling.cs
new file mode 100644
index 00000000000..cd11e61df8a
--- /dev/null
+++ b/docs/_docs/code-snippets/dotnet/JobScheduling.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.Ignite.Core;
+using Apache.Ignite.Core.Compute;
+using Apache.Ignite.Core.Resource;
+
+namespace dotnet_helloworld
+{
+ public class JobScheduling
+ {
+ public void Priority()
+ {
+ // tag::priority[]
+ // PriorityQueueCollisionSpi must be configured in the Spring XML
configuration file ignite-helloworld.xml
+ var cfg = new IgniteConfiguration
+ {
+ SpringConfigUrl = "ignite-helloworld.xml"
+ };
+
+ // Start a node.
+ using var ignite = Ignition.Start(cfg);
+ // end::priority[]
+ }
+
+ // tag::task-priority[]
+ // Compute tasks must be annotated with the
ComputeTaskSessionFullSupport attribute to support distributing
+ // the task's session attributes to compute jobs that the task creates.
+ [ComputeTaskSessionFullSupport]
+ public class MyUrgentTask : ComputeTaskSplitAdapter<int, bool, bool>
+ {
+ // Auto-injected task session.
+ [TaskSessionResource] private IComputeTaskSession _taskSes;
+
+ /// <inheritdoc />
+ protected override ICollection<IComputeJob<bool>> Split(int
gridSize, int arg)
+ {
+ // Set high task priority.
+ _taskSes.SetAttribute("grid.task.priority", 10);
+
+ var jobs = new List<IComputeJob<bool>>(gridSize);
+
+ for (var i = 1; i <= gridSize; i++)
+ {
+ jobs.Add(new MyUrgentJob());
+ }
+
+ // These jobs will be executed with higher priority.
+ return jobs;
+ }
+
+ /// <inheritdoc />
+ public override bool Reduce(IList<IComputeJobResult<bool>>
results) => results.All(r => r.Data);
+ }
+ // end::task-priority[]
+
+ private class MyUrgentJob : ComputeJobAdapter<bool>
+ {
+ public override bool Execute() => true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/docs/_docs/distributed-computing/job-scheduling.adoc
b/docs/_docs/distributed-computing/job-scheduling.adoc
index c242a69c44c..fffde3274bd 100644
--- a/docs/_docs/distributed-computing/job-scheduling.adoc
+++ b/docs/_docs/distributed-computing/job-scheduling.adoc
@@ -15,6 +15,7 @@
= Job Scheduling
:javaFile: {javaCodeDir}/JobScheduling.java
+:csharpFile: {csharpCodeDir}/JobScheduling.cs
When jobs arrive at the destination node, they are submitted to a thread pool
and scheduled for execution in random order.
However, you can change job ordering by configuring `CollisionSpi`.
@@ -70,9 +71,16 @@ tab:C++[unsupported]
Task priorities are set in the
link:distributed-computing/map-reduce#distributed-task-session[task session]
via the `grid.task.priority` attribute. If no priority is assigned to a task,
then the default priority of 0 is used.
-
+[tabs]
+--
+tab:Java[]
[source, java]
----
include::{javaFile}[tag=task-priority,indent=0]
----
-
+tab:C#/.NET[]
+[source,csharp]
+----
+include::{csharpFile}[tag=task-priority,indent=0]
+----
+--
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index 5d6e57d3fae..cd54aed9cdf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -181,12 +181,14 @@ public class PlatformCallbackGateway {
* Perform native task map. Do not throw exceptions, serializing them to
the output stream instead.
*
* @param memPtr Memory pointer.
+ * @param ses Platform compute task session proxy.
*/
- public void computeTaskMap(long memPtr) {
+ public void computeTaskMap(long memPtr, PlatformTargetProxy ses) {
enter();
try {
- PlatformCallbackUtils.inLongOutLong(envPtr,
PlatformCallbackOp.ComputeTaskMap, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(
+ envPtr, PlatformCallbackOp.ComputeTaskMap, memPtr, 0, 0, ses);
}
finally {
leave();
@@ -304,13 +306,14 @@ public class PlatformCallbackGateway {
*
* @param jobPtr Job pointer.
* @param cancel Cancel flag.
+ * @param ses Platform compute task session proxy.
*/
- public void computeJobExecuteLocal(long jobPtr, long cancel) {
+ public void computeJobExecuteLocal(long jobPtr, long cancel,
PlatformTargetProxy ses) {
enter();
try {
PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
- PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0,
null);
+ PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0,
ses);
}
finally {
leave();
@@ -321,12 +324,14 @@ public class PlatformCallbackGateway {
* Execute native job on a node other than where it was created.
*
* @param memPtr Memory pointer.
+ * @param ses Platform compute task session proxy.
*/
- public void computeJobExecute(long memPtr) {
+ public void computeJobExecute(long memPtr, PlatformTargetProxy ses) {
enter();
try {
- PlatformCallbackUtils.inLongOutLong(envPtr,
PlatformCallbackOp.ComputeJobExecute, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(
+ envPtr, PlatformCallbackOp.ComputeJobExecute, memPtr, 0, 0,
ses);
}
finally {
leave();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
index fbde0d8aa9b..eb6c36838fc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import
org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -130,13 +131,14 @@ public abstract class PlatformAbstractJob implements
PlatformJob, Externalizable
*
* @param ctx Context.
* @param cancel Cancel flag.
+ * @param ses Platform compute task session proxy.
* @return Result.
*/
- protected Object runLocal(PlatformContext ctx, boolean cancel) {
+ protected Object runLocal(PlatformContext ctx, boolean cancel,
PlatformTargetProxy ses) {
// Local job, must execute it with respect to possible concurrent task
completion.
if (task.onJobLock()) {
try {
- ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0);
+ ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0, ses);
return LOC_JOB_RES;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
index 3bcb54995bf..3c8db258fe0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -73,7 +73,7 @@ public class PlatformClosureJob extends PlatformAbstractJob {
out.synchronize();
- ctx.gateway().computeJobExecute(mem.pointer());
+ ctx.gateway().computeJobExecute(mem.pointer(), null);
PlatformInputStream in = mem.input();
@@ -91,7 +91,7 @@ public class PlatformClosureJob extends PlatformAbstractJob {
// Local job execution.
assert ptr != 0;
- return runLocal(ctx, false);
+ return runLocal(ctx, false, null);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 90fe4ff28c4..5fef0eacb6b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -163,8 +163,10 @@ public class PlatformCompute extends
PlatformAbstractTarget {
long taskPtr = reader.readLong();
long topVer = reader.readLong();
String taskName = reader.readString();
+ boolean taskSesFullSupport = reader.readBoolean();
- final PlatformFullTask task = new
PlatformFullTask(platformCtx, platformGrp, taskPtr, topVer, taskName);
+ final PlatformFullTask task = new PlatformFullTask(
+ platformCtx, platformGrp, taskPtr, topVer, taskName,
taskSesFullSupport);
return executeNative0(task);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformComputeTaskSession.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformComputeTaskSession.java
new file mode 100644
index 00000000000..aee9888a3bf
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformComputeTaskSession.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+
+import static
org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readMap;
+
+/** {@link ComputeTaskSession} platform wrapper. */
+public class PlatformComputeTaskSession extends PlatformAbstractTarget {
+ /** "get attribute" operation code. */
+ private static final int OP_GET_ATTRIBUTE = 1;
+
+ /** "set attributes" operation code. */
+ private static final int OP_SET_ATTRIBUTES = 2;
+
+ /** Underlying compute task session. */
+ private final ComputeTaskSession ses;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param ses Underlying compute task session
+ */
+ public PlatformComputeTaskSession(final PlatformContext platformCtx, final
ComputeTaskSession ses) {
+ super(platformCtx);
+
+ this.ses = ses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long processInStreamOutLong(
+ final int type, final BinaryRawReaderEx reader, final PlatformMemory
mem) throws IgniteCheckedException {
+
+ if (type == OP_SET_ATTRIBUTES) {
+ final Map<?, ?> attrs = readMap(reader);
+
+ ses.setAttributes(attrs);
+
+ return TRUE;
+ }
+
+ return super.processInStreamOutLong(type, reader, mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void processInStreamOutStream(
+ final int type, final BinaryRawReaderEx reader, final
BinaryRawWriterEx writer) throws IgniteCheckedException {
+
+ if (type == OP_GET_ATTRIBUTE) {
+ final Object key = reader.readObjectDetached();
+
+ final Object val = ses.getAttribute(key);
+
+ writer.writeObjectDetached(val);
+
+ return;
+ }
+
+ super.processInStreamOutStream(type, reader, writer);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
index 4bf3b2f9d70..83a96dd0f8b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -22,13 +22,18 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import
org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import
org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.resources.TaskSessionResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -64,6 +69,10 @@ public class PlatformFullJob extends PlatformAbstractJob {
/** Serialized job. */
private transient byte state;
+ /** Task session of this job. */
+ @TaskSessionResource
+ private transient ComputeTaskSession ses;
+
/**
* {@link Externalizable} support.
*/
@@ -111,8 +120,11 @@ public class PlatformFullJob extends PlatformAbstractJob {
}
try {
+ final PlatformTarget platformSes = new
PlatformComputeTaskSession(ctx, ses);
+ final PlatformTargetProxy platformSesProxy = new
PlatformTargetProxyImpl(platformSes, ctx);
+
if (task != null)
- return runLocal(ctx, cancel);
+ return runLocal(ctx, cancel, platformSesProxy);
else {
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
@@ -122,7 +134,7 @@ public class PlatformFullJob extends PlatformAbstractJob {
out.synchronize();
- ctx.gateway().computeJobExecute(mem.pointer());
+ ctx.gateway().computeJobExecute(mem.pointer(),
platformSesProxy);
PlatformInputStream in = mem.input();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
index fd799b6b46f..40beeea1328 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -26,15 +26,20 @@ import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import
org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import
org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
import
org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.TaskSessionResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -55,6 +60,13 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
/** Platform task name. */
private final String taskName;
+ /** {@code true} if distribution of the session attributes should be
enabled. */
+ private final boolean taskSesFullSupport;
+
+ /** The task session. */
+ @TaskSessionResource
+ private ComputeTaskSession ses;
+
/**
* Constructor.
*
@@ -63,13 +75,21 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
* @param taskPtr Pointer to the task in the native platform.
* @param topVer Initial topology version.
* @param taskName Task name.
+ * @param taskSesFullSupport {@code true} if distribution of the session
attributes should be enabled.
*/
- public PlatformFullTask(PlatformContext ctx, ClusterGroup grp, long
taskPtr, long topVer, String taskName) {
+ public PlatformFullTask(
+ PlatformContext ctx,
+ ClusterGroup grp,
+ long taskPtr,
+ long topVer,
+ String taskName,
+ boolean taskSesFullSupport) {
super(ctx, taskPtr);
this.grp = grp;
this.topVer = topVer;
this.taskName = taskName;
+ this.taskSesFullSupport = taskSesFullSupport;
}
/** {@inheritDoc} */
@@ -86,6 +106,9 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
PlatformMemoryManager memMgr = ctx.memory();
+ final PlatformTarget platformSes = new
PlatformComputeTaskSession(ctx, ses);
+ final PlatformTargetProxy platformSesProxy = new
PlatformTargetProxyImpl(platformSes, ctx);
+
try (PlatformMemory mem = memMgr.allocate()) {
PlatformOutputStream out = mem.output();
@@ -97,7 +120,7 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
out.synchronize();
- ctx.gateway().computeTaskMap(mem.pointer());
+ ctx.gateway().computeTaskMap(mem.pointer(), platformSesProxy);
PlatformInputStream in = mem.input();
@@ -113,6 +136,11 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
}
}
+ /** {@code true} if distribution of session attributes should be enabled.
*/
+ public boolean taskSessionFullSupport() {
+ return taskSesFullSupport;
+ }
+
/**
* Write topology information.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 1761b7a1c20..099acc974ea 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -71,6 +71,7 @@ import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp
import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.platform.compute.PlatformFullTask;
import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatus;
import
org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusSnapshot;
@@ -642,8 +643,9 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
if (log.isDebugEnabled())
log.debug("Task deployment: " + dep);
- boolean fullSup = dep != null && taskCls != null &&
- dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) !=
null;
+ boolean fullSup = (dep != null && taskCls != null &&
+ dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) !=
null) ||
+ (task instanceof PlatformFullTask &&
((PlatformFullTask)task).taskSessionFullSupport());
Collection<UUID> top = null;
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 4c84f162734..d845c458169 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -198,15 +198,6 @@ namespace ignite
break;
}
- case OperationCallback::COMPUTE_JOB_EXECUTE:
- {
- SharedPointer<InteropMemory> mem =
env->Get()->GetMemory(val);
-
- env->Get()->ComputeJobExecute(mem);
-
- break;
- }
-
case OperationCallback::COMPUTE_JOB_DESTROY:
{
env->Get()->ComputeJobDestroy(val);
@@ -319,6 +310,15 @@ namespace ignite
switch (type)
{
+ case OperationCallback::COMPUTE_JOB_EXECUTE:
+ {
+ SharedPointer<InteropMemory> mem =
env->Get()->GetMemory(val1);
+
+ env->Get()->ComputeJobExecute(mem);
+
+ break;
+ }
+
case OperationCallback::COMPUTE_JOB_EXECUTE_LOCAL:
{
env->Get()->ComputeJobExecuteLocal(val1);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskSessionTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskSessionTest.cs
new file mode 100644
index 00000000000..341b636a78d
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskSessionTest.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Compute
+{
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Resource;
+ using NUnit.Framework;
+ using static TestUtils;
+
+ /// <summary>
+ /// Tests for <see cref="IComputeTaskSession"/>
+ /// </summary>
+ public class ComputeTaskSessionTest
+ {
+ /// <summary>
+ /// Data stored in a session by a task is available in the compute job
created by the task on another node.
+ /// </summary>
+ [Test]
+ public void DistributesTaskSessionAttributeRemotely()
+ {
+ // Given an Ignite cluster consisting of server and client nodes
+ using var ignored =
Ignition.Start(GetIgniteConfiguration("server1"));
+ using var ignite =
Ignition.Start(GetIgniteConfiguration("client1", true));
+
+ // When the user executes a task setting a session attribute and
creating a job getting the attribute
+ const string attrName = "attr1";
+ const int attrValue = 123;
+ var task = new SessionAttributeSetterTask(attrName);
+ var sessionValue = ignite.GetCompute().Execute(task, attrValue);
+
+ // Then the task returns the same attribute value
+ Assert.AreEqual(attrValue, sessionValue);
+ }
+
+ /// <summary>
+ /// Data stored in session by a task is available in the compute job
created by the task on the same node.
+ /// </summary>
+ [Test]
+ public void DistributesTaskSessionAttributeLocally()
+ {
+ // Given a single node Ignite cluster
+ using var ignite =
Ignition.Start(GetIgniteConfiguration("server1"));
+
+ // When the user executes a task setting a session attribute and
creating a job getting the attribute
+ const string attrName = "attr1";
+ const int attrValue = 123;
+ var task = new SessionAttributeSetterTask(attrName);
+ var sessionValue = ignite.GetCompute().Execute(task, attrValue);
+
+ // Then the task returns the same attribute value
+ Assert.AreEqual(attrValue, sessionValue);
+ }
+
+ private static IgniteConfiguration GetIgniteConfiguration(string
igniteName, bool isClient = false) =>
+ new IgniteConfiguration
+ {
+ ClientMode = isClient,
+ ConsistentId = igniteName,
+ IgniteInstanceName = igniteName,
+ DiscoverySpi = GetStaticDiscovery(),
+ JvmOptions = TestJavaOptions()
+ };
+
+ /// <summary>
+ /// Sets the specified session attribute and creates one <see
cref="SessionAttributeGetterJob"/>.
+ /// </summary>
+ [ComputeTaskSessionFullSupport]
+ private class SessionAttributeSetterTask :
ComputeTaskSplitAdapter<int, int, int>
+ {
+ private readonly string _attrName;
+#pragma warning disable 649
+ [TaskSessionResource] private IComputeTaskSession _taskSession;
+#pragma warning restore 649
+
+ public SessionAttributeSetterTask(string attrName)
+ {
+ _attrName = attrName;
+ }
+
+ /// <inheritdoc />
+ public override int Reduce(IList<IComputeJobResult<int>> results)
=> results.Select(res => res.Data).Sum();
+
+ /// <inheritdoc />
+ protected override ICollection<IComputeJob<int>> Split(int
gridSize, int attrValue)
+ {
+ _taskSession.SetAttributes(KeyValuePair.Create(_attrName,
attrValue));
+ return new List<IComputeJob<int>> {new
SessionAttributeGetterJob(_attrName)};
+ }
+ }
+
+ /// <summary>
+ /// Returns the specified session attribute.
+ /// </summary>
+ private class SessionAttributeGetterJob : ComputeJobAdapter<int>
+ {
+ private readonly string _attrName;
+#pragma warning disable 649
+ [TaskSessionResource] private IComputeTaskSession _taskSession;
+#pragma warning restore 649
+
+ public SessionAttributeGetterJob(string attrName)
+ {
+ _attrName = attrName;
+ }
+
+ /// <inheritdoc />
+ public override int Execute() => _taskSession.GetAttribute<string,
int>(_attrName);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTaskSession.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTaskSession.cs
new file mode 100644
index 00000000000..e94e6b3e95a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTaskSession.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Compute
+{
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Stores custom compute task attributes. Specific compute task
implementations must be annotated with the
+ /// <see cref="ComputeTaskSessionFullSupportAttribute"/> to enable
distributing the task attributes to the compute
+ /// jobs that the task creates.
+ /// </summary>
+ public interface IComputeTaskSession
+ {
+ /// <summary>
+ /// Gets the value of the given key or <c>null</c> if the key does not
exist.
+ /// </summary>
+ TV GetAttribute<TK, TV>(TK key);
+
+ /// <summary>
+ /// Stores the collection of attributes.
+ /// </summary>
+ void SetAttributes<TK, TV>(params KeyValuePair<TK, TV>[] attrs);
+ }
+}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index af2f793000a..8e9a6a1dd13 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -237,6 +237,7 @@ namespace Apache.Ignite.Core.Impl.Compute
s.WriteLong(ptr);
s.WriteLong(_prj.TopologyVersion);
s.WriteString(GetComputeExecutableName(task));
+ s.WriteBoolean(holder.TaskSessionFullSupport);
});
var future = holder.Future;
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
index c94ebcdad36..f684b047e36 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
@@ -130,6 +130,16 @@ namespace Apache.Ignite.Core.Impl.Compute
ResourceProcessor.Inject(Job, (Ignite)ignite);
}
+ /// <summary>
+ /// Injects compute task session into wrapped object.
+ /// </summary>
+ [TaskSessionResource]
+ public void InjectTaskSession(IComputeTaskSession taskSes)
+ {
+ // Propagate injection
+ ResourceProcessor.InjectComputeTaskSession(Job, taskSes);
+ }
+
/// <summary>
/// Gets the inner job.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
index 7d83380a8a3..228b66809fe 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+using Apache.Ignite.Core.Compute;
+
namespace Apache.Ignite.Core.Impl.Compute
{
using System;
@@ -71,11 +73,12 @@ namespace Apache.Ignite.Core.Impl.Compute
/// Executes local job.
/// </summary>
/// <param name="cancel">Cancel flag.</param>
+ /// <param name="taskSes">Compute task session</param>
[SuppressMessage("Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "User code can throw any exception type.")]
- public void ExecuteLocal(bool cancel)
+ public void ExecuteLocal(bool cancel, IComputeTaskSession taskSes)
{
- ComputeRunner.InjectResources(_ignite, _job);
+ ComputeRunner.InjectResources(_ignite, taskSes, _job);
var nodeId = _ignite.GetIgnite().GetCluster().GetLocalNode().Id;
@@ -99,9 +102,10 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
/// <param name="cancel">Whether the job must be cancelled.</param>
/// <param name="stream">Stream.</param>
- public void ExecuteRemote(PlatformMemoryStream stream, bool cancel)
+ /// <param name="taskSes">Compute task session</param>
+ public void ExecuteRemote(PlatformMemoryStream stream, bool cancel,
IComputeTaskSession taskSes)
{
- ComputeRunner.ExecuteJobAndWriteResults(_ignite, stream, _job, _
=> Execute0(cancel));
+ ComputeRunner.ExecuteJobAndWriteResults(_ignite, taskSes, stream,
_job, _ => Execute0(cancel));
}
/// <summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
index 10170fbcfc6..0f57b2f4d8d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+using Apache.Ignite.Core.Compute;
+
namespace Apache.Ignite.Core.Impl.Compute
{
using System;
@@ -37,7 +39,11 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
[SuppressMessage("Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "User code can throw any exception type.")]
- public static void ExecuteJobAndWriteResults<T>(IIgniteInternal
ignite, PlatformMemoryStream stream, T job,
+ public static void ExecuteJobAndWriteResults<T>(
+ IIgniteInternal ignite,
+ IComputeTaskSession taskSes,
+ PlatformMemoryStream stream,
+ T job,
Func<T, object> execFunc)
{
Debug.Assert(stream != null);
@@ -46,7 +52,7 @@ namespace Apache.Ignite.Core.Impl.Compute
Debug.Assert(execFunc != null);
// 0. Inject resources.
- InjectResources(ignite, job);
+ InjectResources(ignite, taskSes, job);
// 1. Execute job.
object res;
@@ -84,14 +90,18 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <summary>
/// Performs compute-specific resource injection.
/// </summary>
- public static void InjectResources(IIgniteInternal ignite, object job)
+ public static void InjectResources(IIgniteInternal ignite,
IComputeTaskSession computeTaskSession, object job)
{
var injector = job as IComputeResourceInjector;
+ // Injecting task session is supported only for
if (injector != null)
injector.Inject(ignite);
else
+ {
ResourceProcessor.Inject(job, ignite);
+ ResourceProcessor.InjectComputeTaskSession(job,
computeTaskSession);
+ }
}
}
}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
index e2bed73e195..32d301d4ecd 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
@@ -43,8 +43,9 @@ namespace Apache.Ignite.Core.Impl.Compute
/// Perform map step.
/// </summary>
/// <param name="stream">Stream with IN data (topology info) and for
OUT data (map result).</param>
+ /// <param name="taskSes">Optional compute task session</param>
/// <returns>Map with produced jobs.</returns>
- void Map(PlatformMemoryStream stream);
+ void Map(PlatformMemoryStream stream, IComputeTaskSession taskSes);
/// <summary>
/// Process local job result.
@@ -104,6 +105,9 @@ namespace Apache.Ignite.Core.Impl.Compute
/** Task future. */
private readonly Future<TR> _fut = new Future<TR>();
+ /** Resource descriptor. */
+ private readonly ResourceTypeDescriptor _resDesc;
+
/** Jobs whose results are cached. */
private ISet<object> _resJobs;
@@ -125,23 +129,26 @@ namespace Apache.Ignite.Core.Impl.Compute
_compute = compute;
_arg = arg;
_task = task;
-
- ResourceTypeDescriptor resDesc =
ResourceProcessor.Descriptor(task.GetType());
+ _resDesc = ResourceProcessor.Descriptor(task.GetType());
IComputeResourceInjector injector = task as
IComputeResourceInjector;
if (injector != null)
injector.Inject(grid);
else
- resDesc.InjectIgnite(task, grid);
+ _resDesc.InjectIgnite(task, grid);
- _resCache = !resDesc.TaskNoResultCache;
+ _resCache = !_resDesc.TaskNoResultCache;
+ TaskSessionFullSupport = _resDesc.TaskSessionFullSupport;
}
+
+ /// <inheritdoc cref="ResourceTypeDescriptor.TaskSessionFullSupport"/>
+ public bool TaskSessionFullSupport { get; }
/** <inheritDoc /> */
[SuppressMessage("Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "User code can throw any exception")]
- public void Map(PlatformMemoryStream stream)
+ public void Map(PlatformMemoryStream stream, IComputeTaskSession
taskSes)
{
IList<IClusterNode> subgrid;
@@ -149,6 +156,9 @@ namespace Apache.Ignite.Core.Impl.Compute
var ignite = (Ignite) prj.Ignite;
+ // 0. Inject session
+ _resDesc.InjectTaskSession(_task, taskSes);
+
// 1. Unmarshal topology info if topology changed.
var reader = prj.Marshaller.StartUnmarshal(stream);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSession.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSession.cs
new file mode 100644
index 00000000000..4acbee11095
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSession.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Compute;
+ using Binary;
+
+ /// <summary>
+ /// Implements <see cref="IComputeTaskSession"/> by delegating the
implementation to the Java side.
+ /// </summary>
+ internal class ComputeTaskSession : PlatformTargetAdapter,
IComputeTaskSession
+ {
+ /// <summary>
+ /// Operation codes
+ /// </summary>
+ private enum Op
+ {
+ GetAttribute = 1,
+ SetAttributes = 2
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeTaskSession"/>
class.
+ /// </summary>
+ public ComputeTaskSession(IPlatformTargetInternal target) :
base(target)
+ {
+ }
+
+ /// <inheritdoc />
+ public TV GetAttribute<TK, TV>(TK key) =>
+ DoOutInOp<TV>((int) Op.GetAttribute, w => w.Write(key));
+
+ /// <inheritdoc />
+ public void SetAttributes<TK, TV>(params KeyValuePair<TK, TV>[] attrs)
=>
+ DoOutOp((int) Op.SetAttributes, writer =>
writer.WriteDictionary(attrs));
+ }
+}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSessionFullSupportAttribute.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSessionFullSupportAttribute.cs
new file mode 100644
index 00000000000..01541f76005
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSessionFullSupportAttribute.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Compute
+{
+ using System;
+ using static System.AttributeTargets;
+
+ /// <summary>
+ /// Enables distributing <see cref="IComputeTaskSession"/>'s attributes
from
+ /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> to <see
cref="IComputeJob{TRes}"/> that the task creates.
+ /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementations must be
annotated with the
+ /// <see cref="ComputeTaskSessionFullSupportAttribute"/> to enable the
features depending on the
+ /// <see cref="IComputeTaskSession"/> attributes.
+ /// By default attributes and checkpoints are disabled for performance
reasons.
+ /// </summary>
+ [AttributeUsage(Class | Struct)]
+ public sealed class ComputeTaskSessionFullSupportAttribute : Attribute
+ {
+ // No-op.
+ }
+}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
index 6e37006968b..9846372ecce 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+using Apache.Ignite.Core.Compute;
+
namespace Apache.Ignite.Core.Impl.Resource
{
using System;
@@ -91,5 +93,20 @@ namespace Apache.Ignite.Core.Impl.Resource
Descriptor(store.GetType()).InjectStoreSession(store, ses);
}
+
+ /// <summary>
+ /// Injects compute task session into a job or task.
+ /// </summary>
+ /// <param name="target">Compute job or task</param>
+ /// <param name="taskSes">Compute task session</param>
+ public static void InjectComputeTaskSession(object target,
IComputeTaskSession taskSes)
+ {
+ if (target != null)
+ {
+ var desc = Descriptor(target.GetType());
+
+ desc.InjectTaskSession(target, taskSes);
+ }
+ }
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
index 609ccfd565a..ab43723cdde 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
@@ -36,17 +36,26 @@ namespace Apache.Ignite.Core.Impl.Resource
/** Attribute type: StoreSessionResourceAttribute. */
private static readonly Type TypAttrStoreSes =
typeof(StoreSessionResourceAttribute);
+ /** Attribute type: TaskSessionResourceAttribute. */
+ private static readonly Type TypAttrTaskSes =
typeof(TaskSessionResourceAttribute);
+
/** Type: IGrid. */
private static readonly Type TypIgnite = typeof(IIgnite);
/** Type: ICacheStoreSession. */
private static readonly Type TypStoreSes = typeof (ICacheStoreSession);
+ /** Type: IComputeTaskSession. */
+ private static readonly Type TypTaskSes = typeof (IComputeTaskSession);
+
/** Type: ComputeTaskNoResultCacheAttribute. */
private static readonly Type TypComputeTaskNoResCache =
typeof(ComputeTaskNoResultCacheAttribute);
+ /** Type: ComputeTaskSessionFullSupportAttribute. */
+ private static readonly Type TypComputeTaskSessionFullSupport =
typeof(ComputeTaskSessionFullSupportAttribute);
+
/** Cached binding flags. */
- private const BindingFlags Flags = BindingFlags.Instance |
BindingFlags.Public |
+ private const BindingFlags Flags = BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic | BindingFlags.DeclaredOnly;
/** Ignite injectors. */
@@ -54,10 +63,13 @@ namespace Apache.Ignite.Core.Impl.Resource
/** Session injectors. */
private readonly IList<IResourceInjector> _storeSesInjectors;
-
+
+ /** Compute task session injectors. */
+ private readonly IList<IResourceInjector> _taskSesInjectors;
+
/** Task "no result cache" flag. */
private readonly bool _taskNoResCache;
-
+
/// <summary>
/// Constructor.
/// </summary>
@@ -66,20 +78,23 @@ namespace Apache.Ignite.Core.Impl.Resource
{
Collector gridCollector = new Collector(TypAttrIgnite, TypIgnite);
Collector storeSesCollector = new Collector(TypAttrStoreSes,
TypStoreSes);
+ var taskSesCollector = new Collector(TypAttrTaskSes, TypTaskSes);
Type curType = type;
while (curType != null)
{
- CreateInjectors(curType, gridCollector, storeSesCollector);
+ CreateInjectors(curType, gridCollector, storeSesCollector,
taskSesCollector);
curType = curType.BaseType;
}
_igniteInjectors = gridCollector.Injectors;
_storeSesInjectors = storeSesCollector.Injectors;
+ _taskSesInjectors = taskSesCollector.Injectors;
_taskNoResCache = ContainsAttribute(type,
TypComputeTaskNoResCache, true);
+ TaskSessionFullSupport = ContainsAttribute(type,
TypComputeTaskSessionFullSupport, true);
}
/// <summary>
@@ -102,6 +117,19 @@ namespace Apache.Ignite.Core.Impl.Resource
Inject0(target, ses, _storeSesInjectors);
}
+ /// <summary>
+ /// Inject compute task session.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="ses">Compute task session.</param>
+ public void InjectTaskSession(object target, IComputeTaskSession ses)
+ {
+ if (ses != null)
+ {
+ Inject0(target, ses, _taskSesInjectors);
+ }
+ }
+
/// <summary>
/// Perform injection.
/// </summary>
@@ -127,7 +155,10 @@ namespace Apache.Ignite.Core.Impl.Resource
return _taskNoResCache;
}
}
-
+
+ /// <inheritdoc cref="ComputeTaskSessionFullSupportAttribute"/>
+ public bool TaskSessionFullSupport { get; }
+
/// <summary>
/// Create gridInjectors for the given type.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index b2fcc96b009..6c5dde10661 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -493,11 +493,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: COMPUTE
- private long ComputeTaskMap(long memPtr)
+ private long ComputeTaskMap(long memPtr, long ignored, long ignored2,
void* sesPtr)
{
using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- Task(stream.ReadLong()).Map(stream);
+ var ses = TaskSession(sesPtr);
+ Task(stream.ReadLong()).Map(stream, ses);
return 0;
}
@@ -562,15 +563,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
}
- private long ComputeJobExecuteLocal(long jobPtr, long cancel, long
unused, void* arg)
+ private long ComputeJobExecuteLocal(long jobPtr, long cancel, long
unused, void* sesPtr)
{
- Job(jobPtr).ExecuteLocal(cancel == 1);
+ var ses = TaskSession(sesPtr);
+
+ Job(jobPtr).ExecuteLocal(cancel == 1, ses);
return 0;
}
- private long ComputeJobExecute(long memPtr)
+ private long ComputeJobExecute(long memPtr, long ignored, long
ignored2, void* sesPtr)
{
+ var ses = TaskSession(sesPtr);
using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
var job = Job(stream.ReadLong());
@@ -579,7 +583,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
stream.Reset();
- job.ExecuteRemote(stream, cancel);
+ job.ExecuteRemote(stream, cancel, ses);
}
return 0;
@@ -615,6 +619,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
return _handleRegistry.Get<IComputeTaskHolder>(taskPtr);
}
+ private IComputeTaskSession TaskSession(void* sesPtr)
+ {
+ if (sesPtr == null)
+ {
+ return null;
+ }
+
+ var sesRef = _jvm.AttachCurrentThread().NewGlobalRef((IntPtr)
sesPtr);
+ var sesTarget = new PlatformJniTarget(sesRef, _ignite.Marshaller);
+ return new ComputeTaskSession(sesTarget);
+ }
+
/// <summary>
/// Get compute job using it's GC handle pointer.
/// </summary>
@@ -640,7 +656,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
stream.Reset();
var invoker =
DelegateTypeDescriptor.GetComputeOutFunc(func.GetType());
- ComputeRunner.ExecuteJobAndWriteResults(_ignite, stream, func,
invoker);
+ ComputeRunner.ExecuteJobAndWriteResults(_ignite, null, stream,
func, invoker);
}
return 0;
@@ -660,7 +676,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
stream.Reset();
- ComputeRunner.ExecuteJobAndWriteResults(_ignite, stream,
action, act =>
+ ComputeRunner.ExecuteJobAndWriteResults(_ignite, null, stream,
action, act =>
{
act.Invoke();
return null;
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Resource/TaskSessionResourceAttribute.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Resource/TaskSessionResourceAttribute.cs
new file mode 100644
index 00000000000..905bb271deb
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Resource/TaskSessionResourceAttribute.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Resource
+{
+ using System;
+ using Compute;
+ using static System.AttributeTargets;
+
+ /// <summary>
+ /// Injects <see cref="IComputeTaskSession"/> into implementations of
+ /// <see cref="IComputeTask{TJobRes,TReduceRes}"/> and <see
cref="IComputeJob{TRes}"/>.
+ /// </summary>
+ [AttributeUsage(Field | Method | Property)]
+ public sealed class TaskSessionResourceAttribute : Attribute
+ {
+ // No-op.
+ }
+}