This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a27d21bff3c IGNITE-22349 .NET: Add compute task session support 
(#11373)
a27d21bff3c is described below

commit a27d21bff3cd5563ab5e3f55cae52df76a917cc8
Author: kukushal <[email protected]>
AuthorDate: Wed Jun 26 11:09:33 2024 +0300

    IGNITE-22349 .NET: Add compute task session support (#11373)
    
    Add compute task session support to .NET. This allows the user to change 
the priority of a compute job via `grid.task.priority` attribute (see 
[docs](https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering)).
    
    * The new `IComputeTaskSession` interface supports the `SetAttributes` and 
`GetAttribute` operations with the same
    signature as their Java `ComputeTaskSession` counterparts.
      * The `ComputeTaskSession` implementation delegates operations to the 
Java side via `PlatformJniTarget`, which
      receives the unmanaged session pointer in the JNI `ComputeTaskMap` and 
`ComputeJobExecute` callbacks.
      * On the Java side the new `PlatformComputeTaskSession` class receives 
the JNI calls and delegates operations to
      wrapped `ComputeTaskSession`.
      * The `ComputeTaskSession` on the Java side in injected into 
`PlatformFullJob` or `PlatformFullTask`. At first glance
      the injection might seem slow. However injectors are cached so subsequent 
injections would be as fast as setting the
      field directly. The alternative approach to pass `ComputeTaskSession` 
directly to the `PlatformFullJob` and
      `PlatformFullTask` constructors seems to required lots of code changes.
    * The new .NET `ComputeTaskSessionFullSupportAttribute` explicitly enables 
the compute session attributes API in the
    same way the Java's 
[ComputeTaskSessionFullSupport](https://github.com/apache/ignite/blob/2.16.0/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSessionFullSupport.java)
 annotation does. This flag is passed to Java as the `Execute` JNI operation 
parameter and stored in the `PlatformFullTask`.
    * The new .NET `TaskSessionResourceAttribute` injects the 
`IComputeTaskSession` into implementations of `IComputeTask` and `IComputeJob`.
    * Added automated tests for distributing session attributes on the same and 
different nodes.
    * Documentation: added new "C#/.NET" tab in the `MyUrgentTask` code snippet 
in the [Priority 
Ordering](https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering)
 section.
---
 docs/_docs/code-snippets/dotnet/JobScheduling.cs   |  76 ++++++++++++
 .../distributed-computing/job-scheduling.adoc      |  12 +-
 .../platform/callback/PlatformCallbackGateway.java |  17 ++-
 .../platform/compute/PlatformAbstractJob.java      |   6 +-
 .../platform/compute/PlatformClosureJob.java       |   4 +-
 .../platform/compute/PlatformCompute.java          |   4 +-
 .../compute/PlatformComputeTaskSession.java        |  85 ++++++++++++++
 .../platform/compute/PlatformFullJob.java          |  16 ++-
 .../platform/compute/PlatformFullTask.java         |  32 +++++-
 .../processors/task/GridTaskProcessor.java         |   6 +-
 .../cpp/core/src/impl/ignite_environment.cpp       |  18 +--
 .../Compute/ComputeTaskSessionTest.cs              | 127 +++++++++++++++++++++
 .../Compute/IComputeTaskSession.cs                 |  39 +++++++
 .../Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs |   1 +
 .../Apache.Ignite.Core/Impl/Compute/ComputeJob.cs  |  10 ++
 .../Impl/Compute/ComputeJobHolder.cs               |  12 +-
 .../Impl/Compute/ComputeRunner.cs                  |  16 ++-
 .../Impl/Compute/ComputeTaskHolder.cs              |  22 +++-
 .../Impl/Compute/ComputeTaskSession.cs             |  53 +++++++++
 .../ComputeTaskSessionFullSupportAttribute.cs      |  36 ++++++
 .../Impl/Resource/ResourceProcessor.cs             |  17 +++
 .../Impl/Resource/ResourceTypeDescriptor.cs        |  41 ++++++-
 .../Impl/Unmanaged/UnmanagedCallbacks.cs           |  32 ++++--
 .../Resource/TaskSessionResourceAttribute.cs       |  33 ++++++
 24 files changed, 661 insertions(+), 54 deletions(-)

diff --git a/docs/_docs/code-snippets/dotnet/JobScheduling.cs 
b/docs/_docs/code-snippets/dotnet/JobScheduling.cs
new file mode 100644
index 00000000000..cd11e61df8a
--- /dev/null
+++ b/docs/_docs/code-snippets/dotnet/JobScheduling.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.Ignite.Core;
+using Apache.Ignite.Core.Compute;
+using Apache.Ignite.Core.Resource;
+
+namespace dotnet_helloworld
+{
+    public class JobScheduling
+    {
+        public void Priority()
+        {
+            // tag::priority[]
+            // PriorityQueueCollisionSpi must be configured in the Spring XML 
configuration file ignite-helloworld.xml
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "ignite-helloworld.xml"
+            };
+
+            // Start a node.
+            using var ignite = Ignition.Start(cfg);
+            // end::priority[]
+        }
+
+        // tag::task-priority[]
+        // Compute tasks must be annotated with the 
ComputeTaskSessionFullSupport attribute to support distributing
+        // the task's session attributes to compute jobs that the task creates.
+        [ComputeTaskSessionFullSupport]
+        public class MyUrgentTask : ComputeTaskSplitAdapter<int, bool, bool>
+        {
+            // Auto-injected task session.
+            [TaskSessionResource] private IComputeTaskSession _taskSes;
+
+            /// <inheritdoc />
+            protected override ICollection<IComputeJob<bool>> Split(int 
gridSize, int arg)
+            {
+                // Set high task priority.
+                _taskSes.SetAttribute("grid.task.priority", 10);
+
+                var jobs = new List<IComputeJob<bool>>(gridSize);
+
+                for (var i = 1; i <= gridSize; i++)
+                {
+                    jobs.Add(new MyUrgentJob());
+                }
+
+                // These jobs will be executed with higher priority.
+                return jobs;
+            }
+
+            /// <inheritdoc />
+            public override bool Reduce(IList<IComputeJobResult<bool>> 
results) => results.All(r => r.Data);
+        }
+        // end::task-priority[]
+
+        private class MyUrgentJob : ComputeJobAdapter<bool>
+        {
+            public override bool Execute() => true;
+        }
+    }
+}
\ No newline at end of file
diff --git a/docs/_docs/distributed-computing/job-scheduling.adoc 
b/docs/_docs/distributed-computing/job-scheduling.adoc
index c242a69c44c..fffde3274bd 100644
--- a/docs/_docs/distributed-computing/job-scheduling.adoc
+++ b/docs/_docs/distributed-computing/job-scheduling.adoc
@@ -15,6 +15,7 @@
 = Job Scheduling
 
 :javaFile: {javaCodeDir}/JobScheduling.java
+:csharpFile: {csharpCodeDir}/JobScheduling.cs
 
 When jobs arrive at the destination node, they are submitted to a thread pool 
and scheduled for execution in random order.
 However, you can change job ordering by configuring `CollisionSpi`.
@@ -70,9 +71,16 @@ tab:C++[unsupported]
 
 Task priorities are set in the 
link:distributed-computing/map-reduce#distributed-task-session[task session] 
via the `grid.task.priority` attribute. If no priority is assigned to a task, 
then the default priority of 0 is used.
 
-
+[tabs]
+--
+tab:Java[]
 [source, java]
 ----
 include::{javaFile}[tag=task-priority,indent=0]
 ----
-
+tab:C#/.NET[]
+[source,csharp]
+----
+include::{csharpFile}[tag=task-priority,indent=0]
+----
+--
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index 5d6e57d3fae..cd54aed9cdf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -181,12 +181,14 @@ public class PlatformCallbackGateway {
      * Perform native task map. Do not throw exceptions, serializing them to 
the output stream instead.
      *
      * @param memPtr Memory pointer.
+     * @param ses Platform compute task session proxy.
      */
-    public void computeTaskMap(long memPtr) {
+    public void computeTaskMap(long memPtr, PlatformTargetProxy ses) {
         enter();
 
         try {
-            PlatformCallbackUtils.inLongOutLong(envPtr, 
PlatformCallbackOp.ComputeTaskMap, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(
+                envPtr, PlatformCallbackOp.ComputeTaskMap, memPtr, 0, 0, ses);
         }
         finally {
             leave();
@@ -304,13 +306,14 @@ public class PlatformCallbackGateway {
      *
      * @param jobPtr Job pointer.
      * @param cancel Cancel flag.
+     * @param ses Platform compute task session proxy.
      */
-    public void computeJobExecuteLocal(long jobPtr, long cancel) {
+    public void computeJobExecuteLocal(long jobPtr, long cancel, 
PlatformTargetProxy ses) {
         enter();
 
         try {
             PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
-                PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0, 
null);
+                PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0, 
ses);
         }
         finally {
             leave();
@@ -321,12 +324,14 @@ public class PlatformCallbackGateway {
      * Execute native job on a node other than where it was created.
      *
      * @param memPtr Memory pointer.
+     * @param ses Platform compute task session proxy.
      */
-    public void computeJobExecute(long memPtr) {
+    public void computeJobExecute(long memPtr, PlatformTargetProxy ses) {
         enter();
 
         try {
-            PlatformCallbackUtils.inLongOutLong(envPtr, 
PlatformCallbackOp.ComputeJobExecute, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(
+                envPtr, PlatformCallbackOp.ComputeJobExecute, memPtr, 0, 0, 
ses);
         }
         finally {
             leave();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
index fbde0d8aa9b..eb6c36838fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import 
org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -130,13 +131,14 @@ public abstract class PlatformAbstractJob implements 
PlatformJob, Externalizable
      *
      * @param ctx Context.
      * @param cancel Cancel flag.
+     * @param ses Platform compute task session proxy.
      * @return Result.
      */
-    protected Object runLocal(PlatformContext ctx, boolean cancel) {
+    protected Object runLocal(PlatformContext ctx, boolean cancel, 
PlatformTargetProxy ses) {
         // Local job, must execute it with respect to possible concurrent task 
completion.
         if (task.onJobLock()) {
             try {
-                ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0);
+                ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0, ses);
 
                 return LOC_JOB_RES;
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
index 3bcb54995bf..3c8db258fe0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -73,7 +73,7 @@ public class PlatformClosureJob extends PlatformAbstractJob {
 
                 out.synchronize();
 
-                ctx.gateway().computeJobExecute(mem.pointer());
+                ctx.gateway().computeJobExecute(mem.pointer(), null);
 
                 PlatformInputStream in = mem.input();
 
@@ -91,7 +91,7 @@ public class PlatformClosureJob extends PlatformAbstractJob {
             // Local job execution.
             assert ptr != 0;
 
-            return runLocal(ctx, false);
+            return runLocal(ctx, false, null);
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 90fe4ff28c4..5fef0eacb6b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -163,8 +163,10 @@ public class PlatformCompute extends 
PlatformAbstractTarget {
                 long taskPtr = reader.readLong();
                 long topVer = reader.readLong();
                 String taskName = reader.readString();
+                boolean taskSesFullSupport = reader.readBoolean();
 
-                final PlatformFullTask task = new 
PlatformFullTask(platformCtx, platformGrp, taskPtr, topVer, taskName);
+                final PlatformFullTask task = new PlatformFullTask(
+                    platformCtx, platformGrp, taskPtr, topVer, taskName, 
taskSesFullSupport);
 
                 return executeNative0(task);
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformComputeTaskSession.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformComputeTaskSession.java
new file mode 100644
index 00000000000..aee9888a3bf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformComputeTaskSession.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+
+import static 
org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readMap;
+
+/** {@link ComputeTaskSession} platform wrapper. */
+public class PlatformComputeTaskSession extends PlatformAbstractTarget {
+    /** "get attribute" operation code. */
+    private static final int OP_GET_ATTRIBUTE = 1;
+
+    /** "set attributes" operation code. */
+    private static final int OP_SET_ATTRIBUTES = 2;
+
+    /** Underlying compute task session. */
+    private final ComputeTaskSession ses;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param ses         Underlying compute task session
+     */
+    public PlatformComputeTaskSession(final PlatformContext platformCtx, final 
ComputeTaskSession ses) {
+        super(platformCtx);
+
+        this.ses = ses;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long processInStreamOutLong(
+        final int type, final BinaryRawReaderEx reader, final PlatformMemory 
mem) throws IgniteCheckedException {
+
+        if (type == OP_SET_ATTRIBUTES) {
+            final Map<?, ?> attrs = readMap(reader);
+
+            ses.setAttributes(attrs);
+
+            return TRUE;
+        }
+
+        return super.processInStreamOutLong(type, reader, mem);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void processInStreamOutStream(
+        final int type, final BinaryRawReaderEx reader, final 
BinaryRawWriterEx writer) throws IgniteCheckedException {
+
+        if (type == OP_GET_ATTRIBUTE) {
+            final Object key = reader.readObjectDetached();
+
+            final Object val = ses.getAttribute(key);
+
+            writer.writeObjectDetached(val);
+
+            return;
+        }
+
+        super.processInStreamOutStream(type, reader, writer);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
index 4bf3b2f9d70..83a96dd0f8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -22,13 +22,18 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
 import 
org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import 
org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.resources.TaskSessionResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -64,6 +69,10 @@ public class PlatformFullJob extends PlatformAbstractJob {
     /** Serialized job. */
     private transient byte state;
 
+    /** Task session of this job. */
+    @TaskSessionResource
+    private transient ComputeTaskSession ses;
+
     /**
      * {@link Externalizable} support.
      */
@@ -111,8 +120,11 @@ public class PlatformFullJob extends PlatformAbstractJob {
         }
 
         try {
+            final PlatformTarget platformSes = new 
PlatformComputeTaskSession(ctx, ses);
+            final PlatformTargetProxy platformSesProxy = new 
PlatformTargetProxyImpl(platformSes, ctx);
+
             if (task != null)
-                return runLocal(ctx, cancel);
+                return runLocal(ctx, cancel, platformSesProxy);
             else {
                 try (PlatformMemory mem = ctx.memory().allocate()) {
                     PlatformOutputStream out = mem.output();
@@ -122,7 +134,7 @@ public class PlatformFullJob extends PlatformAbstractJob {
 
                     out.synchronize();
 
-                    ctx.gateway().computeJobExecute(mem.pointer());
+                    ctx.gateway().computeJobExecute(mem.pointer(), 
platformSesProxy);
 
                     PlatformInputStream in = mem.input();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
index fd799b6b46f..40beeea1328 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -26,15 +26,20 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
 import 
org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import 
org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
 import 
org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.TaskSessionResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -55,6 +60,13 @@ public final class PlatformFullTask extends 
PlatformAbstractTask {
     /** Platform task name. */
     private final String taskName;
 
+    /** {@code true} if distribution of the session attributes should be 
enabled. */
+    private final boolean taskSesFullSupport;
+
+    /** The task session. */
+    @TaskSessionResource
+    private ComputeTaskSession ses;
+
     /**
      * Constructor.
      *
@@ -63,13 +75,21 @@ public final class PlatformFullTask extends 
PlatformAbstractTask {
      * @param taskPtr Pointer to the task in the native platform.
      * @param topVer Initial topology version.
      * @param taskName Task name.
+     * @param taskSesFullSupport {@code true} if distribution of the session 
attributes should be enabled.
      */
-    public PlatformFullTask(PlatformContext ctx, ClusterGroup grp, long 
taskPtr, long topVer, String taskName) {
+    public PlatformFullTask(
+        PlatformContext ctx,
+        ClusterGroup grp,
+        long taskPtr,
+        long topVer,
+        String taskName,
+        boolean taskSesFullSupport) {
         super(ctx, taskPtr);
 
         this.grp = grp;
         this.topVer = topVer;
         this.taskName = taskName;
+        this.taskSesFullSupport = taskSesFullSupport;
     }
 
     /** {@inheritDoc} */
@@ -86,6 +106,9 @@ public final class PlatformFullTask extends 
PlatformAbstractTask {
 
             PlatformMemoryManager memMgr = ctx.memory();
 
+            final PlatformTarget platformSes = new 
PlatformComputeTaskSession(ctx, ses);
+            final PlatformTargetProxy platformSesProxy = new 
PlatformTargetProxyImpl(platformSes, ctx);
+
             try (PlatformMemory mem = memMgr.allocate()) {
                 PlatformOutputStream out = mem.output();
 
@@ -97,7 +120,7 @@ public final class PlatformFullTask extends 
PlatformAbstractTask {
 
                 out.synchronize();
 
-                ctx.gateway().computeTaskMap(mem.pointer());
+                ctx.gateway().computeTaskMap(mem.pointer(), platformSesProxy);
 
                 PlatformInputStream in = mem.input();
 
@@ -113,6 +136,11 @@ public final class PlatformFullTask extends 
PlatformAbstractTask {
         }
     }
 
+    /** {@code true} if distribution of session attributes should be enabled. 
*/
+    public boolean taskSessionFullSupport() {
+        return taskSesFullSupport;
+    }
+
     /**
      * Write topology information.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 1761b7a1c20..099acc974ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -71,6 +71,7 @@ import 
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp
 import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.platform.compute.PlatformFullTask;
 import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
 import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatus;
 import 
org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusSnapshot;
@@ -642,8 +643,9 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
         if (log.isDebugEnabled())
             log.debug("Task deployment: " + dep);
 
-        boolean fullSup = dep != null && taskCls != null &&
-            dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != 
null;
+        boolean fullSup = (dep != null && taskCls != null &&
+            dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != 
null) ||
+            (task instanceof PlatformFullTask && 
((PlatformFullTask)task).taskSessionFullSupport());
 
         Collection<UUID> top = null;
 
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp 
b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 4c84f162734..d845c458169 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -198,15 +198,6 @@ namespace ignite
                     break;
                 }
 
-                case OperationCallback::COMPUTE_JOB_EXECUTE:
-                {
-                    SharedPointer<InteropMemory> mem = 
env->Get()->GetMemory(val);
-
-                    env->Get()->ComputeJobExecute(mem);
-
-                    break;
-                }
-
                 case OperationCallback::COMPUTE_JOB_DESTROY:
                 {
                     env->Get()->ComputeJobDestroy(val);
@@ -319,6 +310,15 @@ namespace ignite
 
             switch (type)
             {
+                case OperationCallback::COMPUTE_JOB_EXECUTE:
+                {
+                    SharedPointer<InteropMemory> mem = 
env->Get()->GetMemory(val1);
+
+                    env->Get()->ComputeJobExecute(mem);
+
+                    break;
+                }
+
                 case OperationCallback::COMPUTE_JOB_EXECUTE_LOCAL:
                 {
                     env->Get()->ComputeJobExecuteLocal(val1);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskSessionTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskSessionTest.cs
new file mode 100644
index 00000000000..341b636a78d
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskSessionTest.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Compute
+{
+    using System.Collections.Generic;
+    using System.Linq;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Resource;
+    using NUnit.Framework;
+    using static TestUtils;
+
+    /// <summary>
+    /// Tests for <see cref="IComputeTaskSession"/>
+    /// </summary>
+    public class ComputeTaskSessionTest
+    {
+        /// <summary>
+        /// Data stored in a session by a task is available in the compute job 
created by the task on another node.
+        /// </summary>
+        [Test]
+        public void DistributesTaskSessionAttributeRemotely()
+        {
+            // Given an Ignite cluster consisting of server and client nodes
+            using var ignored = 
Ignition.Start(GetIgniteConfiguration("server1"));
+            using var ignite = 
Ignition.Start(GetIgniteConfiguration("client1", true));
+            
+            // When the user executes a task setting a session attribute and 
creating a job getting the attribute
+            const string attrName = "attr1";
+            const int attrValue = 123;
+            var task = new SessionAttributeSetterTask(attrName);
+            var sessionValue = ignite.GetCompute().Execute(task, attrValue);
+            
+            // Then the task returns the same attribute value
+            Assert.AreEqual(attrValue, sessionValue);
+        }
+
+        /// <summary>
+        /// Data stored in session by a task is available in the compute job 
created by the task on the same node.
+        /// </summary>
+        [Test]
+        public void DistributesTaskSessionAttributeLocally()
+        {
+            // Given a single node Ignite cluster
+            using var ignite = 
Ignition.Start(GetIgniteConfiguration("server1"));
+            
+            // When the user executes a task setting a session attribute and 
creating a job getting the attribute
+            const string attrName = "attr1";
+            const int attrValue = 123;
+            var task = new SessionAttributeSetterTask(attrName);
+            var sessionValue = ignite.GetCompute().Execute(task, attrValue);
+            
+            // Then the task returns the same attribute value
+            Assert.AreEqual(attrValue, sessionValue);
+        }
+
+        private static IgniteConfiguration GetIgniteConfiguration(string 
igniteName, bool isClient = false) =>
+            new IgniteConfiguration
+            {
+                ClientMode = isClient,
+                ConsistentId = igniteName,
+                IgniteInstanceName = igniteName,
+                DiscoverySpi = GetStaticDiscovery(),
+                JvmOptions = TestJavaOptions()
+            };
+
+        /// <summary>
+        /// Sets the specified session attribute and creates one <see 
cref="SessionAttributeGetterJob"/>.
+        /// </summary>
+        [ComputeTaskSessionFullSupport]
+        private class SessionAttributeSetterTask : 
ComputeTaskSplitAdapter<int, int, int>
+        {
+            private readonly string _attrName;
+#pragma warning disable 649
+            [TaskSessionResource] private IComputeTaskSession _taskSession;
+#pragma warning restore 649
+
+            public SessionAttributeSetterTask(string attrName)
+            {
+                _attrName = attrName;
+            }
+
+            /// <inheritdoc />
+            public override int Reduce(IList<IComputeJobResult<int>> results) 
=> results.Select(res => res.Data).Sum();
+
+            /// <inheritdoc />
+            protected override ICollection<IComputeJob<int>> Split(int 
gridSize, int attrValue)
+            {
+                _taskSession.SetAttributes(KeyValuePair.Create(_attrName, 
attrValue));
+                return new List<IComputeJob<int>> {new 
SessionAttributeGetterJob(_attrName)};
+            }
+        }
+
+        /// <summary>
+        /// Returns the specified session attribute.
+        /// </summary>
+        private class SessionAttributeGetterJob : ComputeJobAdapter<int>
+        {
+            private readonly string _attrName;
+#pragma warning disable 649
+            [TaskSessionResource] private IComputeTaskSession _taskSession;
+#pragma warning restore 649
+
+            public SessionAttributeGetterJob(string attrName)
+            {
+                _attrName = attrName;
+            }
+
+            /// <inheritdoc />
+            public override int Execute() => _taskSession.GetAttribute<string, 
int>(_attrName);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTaskSession.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTaskSession.cs
new file mode 100644
index 00000000000..e94e6b3e95a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTaskSession.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Compute
+{
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// Stores custom compute task attributes. Specific compute task 
implementations must be annotated with the
+    /// <see cref="ComputeTaskSessionFullSupportAttribute"/> to enable 
distributing the task attributes to the compute
+    /// jobs that the task creates.
+    /// </summary>
+    public interface IComputeTaskSession
+    {
+        /// <summary>
+        /// Gets the value of the given key or <c>null</c> if the key does not 
exist.
+        /// </summary>
+        TV GetAttribute<TK, TV>(TK key);
+
+        /// <summary>
+        /// Stores the collection of attributes.
+        /// </summary>
+        void SetAttributes<TK, TV>(params KeyValuePair<TK, TV>[] attrs);
+    }
+}
\ No newline at end of file
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index af2f793000a..8e9a6a1dd13 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -237,6 +237,7 @@ namespace Apache.Ignite.Core.Impl.Compute
                 s.WriteLong(ptr);
                 s.WriteLong(_prj.TopologyVersion);
                 s.WriteString(GetComputeExecutableName(task));
+                s.WriteBoolean(holder.TaskSessionFullSupport);
             });
 
             var future = holder.Future;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
index c94ebcdad36..f684b047e36 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
@@ -130,6 +130,16 @@ namespace Apache.Ignite.Core.Impl.Compute
             ResourceProcessor.Inject(Job, (Ignite)ignite);
         }
 
+        /// <summary>
+        /// Injects compute task session into wrapped object.
+        /// </summary>
+        [TaskSessionResource]
+        public void InjectTaskSession(IComputeTaskSession taskSes)
+        {
+            // Propagate injection
+            ResourceProcessor.InjectComputeTaskSession(Job, taskSes);
+        }
+
         /// <summary>
         /// Gets the inner job.
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
index 7d83380a8a3..228b66809fe 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+using Apache.Ignite.Core.Compute;
+
 namespace Apache.Ignite.Core.Impl.Compute
 {
     using System;
@@ -71,11 +73,12 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// Executes local job.
         /// </summary>
         /// <param name="cancel">Cancel flag.</param>
+        /// <param name="taskSes">Compute task session</param>
         [SuppressMessage("Microsoft.Design", 
"CA1031:DoNotCatchGeneralExceptionTypes",
             Justification = "User code can throw any exception type.")]
-        public void ExecuteLocal(bool cancel)
+        public void ExecuteLocal(bool cancel, IComputeTaskSession taskSes)
         {
-            ComputeRunner.InjectResources(_ignite, _job);
+            ComputeRunner.InjectResources(_ignite, taskSes, _job);
 
             var nodeId = _ignite.GetIgnite().GetCluster().GetLocalNode().Id;
 
@@ -99,9 +102,10 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// </summary>
         /// <param name="cancel">Whether the job must be cancelled.</param>
         /// <param name="stream">Stream.</param>
-        public void ExecuteRemote(PlatformMemoryStream stream, bool cancel)
+        /// <param name="taskSes">Compute task session</param>
+        public void ExecuteRemote(PlatformMemoryStream stream, bool cancel, 
IComputeTaskSession taskSes)
         {
-            ComputeRunner.ExecuteJobAndWriteResults(_ignite, stream, _job, _ 
=> Execute0(cancel));
+            ComputeRunner.ExecuteJobAndWriteResults(_ignite, taskSes, stream, 
_job, _ => Execute0(cancel));
         }
 
         /// <summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
index 10170fbcfc6..0f57b2f4d8d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeRunner.cs
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+using Apache.Ignite.Core.Compute;
+
 namespace Apache.Ignite.Core.Impl.Compute
 {
     using System;
@@ -37,7 +39,11 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// </summary>
         [SuppressMessage("Microsoft.Design", 
"CA1031:DoNotCatchGeneralExceptionTypes",
             Justification = "User code can throw any exception type.")]
-        public static void ExecuteJobAndWriteResults<T>(IIgniteInternal 
ignite, PlatformMemoryStream stream, T job,
+        public static void ExecuteJobAndWriteResults<T>(
+            IIgniteInternal ignite,
+            IComputeTaskSession taskSes,
+            PlatformMemoryStream stream,
+            T job,
             Func<T, object> execFunc)
         {
             Debug.Assert(stream != null);
@@ -46,7 +52,7 @@ namespace Apache.Ignite.Core.Impl.Compute
             Debug.Assert(execFunc != null);
             
             // 0. Inject resources.
-            InjectResources(ignite, job);
+            InjectResources(ignite, taskSes, job);
 
             // 1. Execute job.
             object res;
@@ -84,14 +90,18 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// <summary>
         /// Performs compute-specific resource injection.
         /// </summary>
-        public static void InjectResources(IIgniteInternal ignite, object job)
+        public static void InjectResources(IIgniteInternal ignite, 
IComputeTaskSession computeTaskSession, object job)
         {
             var injector = job as IComputeResourceInjector;
 
+            // Injecting task session is supported only for 
             if (injector != null)
                 injector.Inject(ignite);
             else
+            {
                 ResourceProcessor.Inject(job, ignite);
+                ResourceProcessor.InjectComputeTaskSession(job, 
computeTaskSession);
+            }
         }
     }
 }
\ No newline at end of file
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
index e2bed73e195..32d301d4ecd 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
@@ -43,8 +43,9 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// Perform map step.
         /// </summary>
         /// <param name="stream">Stream with IN data (topology info) and for 
OUT data (map result).</param>
+        /// <param name="taskSes">Optional compute task session</param>
         /// <returns>Map with produced jobs.</returns>
-        void Map(PlatformMemoryStream stream);
+        void Map(PlatformMemoryStream stream, IComputeTaskSession taskSes);
 
         /// <summary>
         /// Process local job result.
@@ -104,6 +105,9 @@ namespace Apache.Ignite.Core.Impl.Compute
         /** Task future. */
         private readonly Future<TR> _fut = new Future<TR>();
 
+        /** Resource descriptor. */
+        private readonly ResourceTypeDescriptor _resDesc;
+
         /** Jobs whose results are cached. */
         private ISet<object> _resJobs;
 
@@ -125,23 +129,26 @@ namespace Apache.Ignite.Core.Impl.Compute
             _compute = compute;
             _arg = arg;
             _task = task;
-
-            ResourceTypeDescriptor resDesc = 
ResourceProcessor.Descriptor(task.GetType());
+            _resDesc = ResourceProcessor.Descriptor(task.GetType());
 
             IComputeResourceInjector injector = task as 
IComputeResourceInjector;
 
             if (injector != null)
                 injector.Inject(grid);
             else
-                resDesc.InjectIgnite(task, grid);
+                _resDesc.InjectIgnite(task, grid);
 
-            _resCache = !resDesc.TaskNoResultCache;
+            _resCache = !_resDesc.TaskNoResultCache;
+            TaskSessionFullSupport = _resDesc.TaskSessionFullSupport;
         }
+        
+        /// <inheritdoc cref="ResourceTypeDescriptor.TaskSessionFullSupport"/>
+        public bool TaskSessionFullSupport { get; }
 
         /** <inheritDoc /> */
         [SuppressMessage("Microsoft.Design", 
"CA1031:DoNotCatchGeneralExceptionTypes",
             Justification = "User code can throw any exception")]
-        public void Map(PlatformMemoryStream stream)
+        public void Map(PlatformMemoryStream stream, IComputeTaskSession 
taskSes)
         {
             IList<IClusterNode> subgrid;
 
@@ -149,6 +156,9 @@ namespace Apache.Ignite.Core.Impl.Compute
 
             var ignite = (Ignite) prj.Ignite;
 
+            // 0. Inject session
+            _resDesc.InjectTaskSession(_task, taskSes);
+
             // 1. Unmarshal topology info if topology changed.
             var reader = prj.Marshaller.StartUnmarshal(stream);
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSession.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSession.cs
new file mode 100644
index 00000000000..4acbee11095
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSession.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Compute;
+    using Binary;
+
+    /// <summary>
+    /// Implements <see cref="IComputeTaskSession"/> by delegating the 
implementation to the Java side.
+    /// </summary>
+    internal class ComputeTaskSession : PlatformTargetAdapter, 
IComputeTaskSession
+    {
+        /// <summary>
+        /// Operation codes
+        /// </summary>
+        private enum Op
+        {
+            GetAttribute = 1,
+            SetAttributes = 2
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeTaskSession"/> 
class.
+        /// </summary>
+        public ComputeTaskSession(IPlatformTargetInternal target) : 
base(target)
+        {
+        }
+
+        /// <inheritdoc />
+        public TV GetAttribute<TK, TV>(TK key) =>
+            DoOutInOp<TV>((int) Op.GetAttribute, w => w.Write(key));
+
+        /// <inheritdoc />
+        public void SetAttributes<TK, TV>(params KeyValuePair<TK, TV>[] attrs) 
=>
+            DoOutOp((int) Op.SetAttributes, writer => 
writer.WriteDictionary(attrs));
+    }
+}
\ No newline at end of file
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSessionFullSupportAttribute.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSessionFullSupportAttribute.cs
new file mode 100644
index 00000000000..01541f76005
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskSessionFullSupportAttribute.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Compute
+{
+    using System;
+    using static System.AttributeTargets;
+
+    /// <summary>
+    /// Enables distributing <see cref="IComputeTaskSession"/>'s attributes 
from
+    /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> to <see 
cref="IComputeJob{TRes}"/> that the task creates.
+    /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementations must be 
annotated with the
+    /// <see cref="ComputeTaskSessionFullSupportAttribute"/> to enable the 
features depending on the
+    /// <see cref="IComputeTaskSession"/> attributes.
+    /// By default attributes and checkpoints are disabled for performance 
reasons.
+    /// </summary>
+    [AttributeUsage(Class | Struct)]
+    public sealed class ComputeTaskSessionFullSupportAttribute : Attribute
+    {
+        // No-op.
+    }
+}
\ No newline at end of file
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
index 6e37006968b..9846372ecce 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceProcessor.cs
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+using Apache.Ignite.Core.Compute;
+
 namespace Apache.Ignite.Core.Impl.Resource
 {
     using System;
@@ -91,5 +93,20 @@ namespace Apache.Ignite.Core.Impl.Resource
 
             Descriptor(store.GetType()).InjectStoreSession(store, ses);
         }
+
+        /// <summary>
+        /// Injects compute task session into a job or task.
+        /// </summary>
+        /// <param name="target">Compute job or task</param>
+        /// <param name="taskSes">Compute task session</param>
+        public static void InjectComputeTaskSession(object target, 
IComputeTaskSession taskSes)
+        {
+            if (target != null)
+            {
+                var desc = Descriptor(target.GetType());
+    
+                desc.InjectTaskSession(target, taskSes);
+            }
+        }
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
index 609ccfd565a..ab43723cdde 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs
@@ -36,17 +36,26 @@ namespace Apache.Ignite.Core.Impl.Resource
         /** Attribute type: StoreSessionResourceAttribute. */
         private static readonly Type TypAttrStoreSes = 
typeof(StoreSessionResourceAttribute);
 
+        /** Attribute type: TaskSessionResourceAttribute. */
+        private static readonly Type TypAttrTaskSes = 
typeof(TaskSessionResourceAttribute);
+
         /** Type: IGrid. */
         private static readonly Type TypIgnite = typeof(IIgnite);
 
         /** Type: ICacheStoreSession. */
         private static readonly Type TypStoreSes = typeof (ICacheStoreSession);
 
+        /** Type: IComputeTaskSession. */
+        private static readonly Type TypTaskSes = typeof (IComputeTaskSession);
+
         /** Type: ComputeTaskNoResultCacheAttribute. */
         private static readonly Type TypComputeTaskNoResCache = 
typeof(ComputeTaskNoResultCacheAttribute);
 
+        /** Type: ComputeTaskSessionFullSupportAttribute. */
+        private static readonly Type TypComputeTaskSessionFullSupport = 
typeof(ComputeTaskSessionFullSupportAttribute);
+
         /** Cached binding flags. */
-        private const BindingFlags Flags = BindingFlags.Instance | 
BindingFlags.Public | 
+        private const BindingFlags Flags = BindingFlags.Instance | 
BindingFlags.Public |
             BindingFlags.NonPublic | BindingFlags.DeclaredOnly;
 
         /** Ignite injectors. */
@@ -54,10 +63,13 @@ namespace Apache.Ignite.Core.Impl.Resource
 
         /** Session injectors. */
         private readonly IList<IResourceInjector> _storeSesInjectors;
-        
+
+        /** Compute task session injectors. */
+        private readonly IList<IResourceInjector> _taskSesInjectors;
+
         /** Task "no result cache" flag. */
         private readonly bool _taskNoResCache;
-        
+
         /// <summary>
         /// Constructor.
         /// </summary>
@@ -66,20 +78,23 @@ namespace Apache.Ignite.Core.Impl.Resource
         {
             Collector gridCollector = new Collector(TypAttrIgnite, TypIgnite);
             Collector storeSesCollector = new Collector(TypAttrStoreSes, 
TypStoreSes);
+            var taskSesCollector = new Collector(TypAttrTaskSes, TypTaskSes);
 
             Type curType = type;
 
             while (curType != null)
             {
-                CreateInjectors(curType, gridCollector, storeSesCollector);
+                CreateInjectors(curType, gridCollector, storeSesCollector, 
taskSesCollector);
 
                 curType = curType.BaseType;
             }
 
             _igniteInjectors = gridCollector.Injectors;
             _storeSesInjectors = storeSesCollector.Injectors;
+            _taskSesInjectors = taskSesCollector.Injectors;
 
             _taskNoResCache = ContainsAttribute(type, 
TypComputeTaskNoResCache, true);
+            TaskSessionFullSupport = ContainsAttribute(type, 
TypComputeTaskSessionFullSupport, true);
         }
 
         /// <summary>
@@ -102,6 +117,19 @@ namespace Apache.Ignite.Core.Impl.Resource
             Inject0(target, ses, _storeSesInjectors);
         }
 
+        /// <summary>
+        /// Inject compute task session.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="ses">Compute task session.</param>
+        public void InjectTaskSession(object target, IComputeTaskSession ses)
+        {
+            if (ses != null)
+            {
+                Inject0(target, ses, _taskSesInjectors);
+            }
+        }
+
         /// <summary>
         /// Perform injection.
         /// </summary>
@@ -127,7 +155,10 @@ namespace Apache.Ignite.Core.Impl.Resource
                 return _taskNoResCache;
             }
         }
-        
+
+        /// <inheritdoc cref="ComputeTaskSessionFullSupportAttribute"/>
+        public bool TaskSessionFullSupport { get; }
+
         /// <summary>
         /// Create gridInjectors for the given type.
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index b2fcc96b009..6c5dde10661 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -493,11 +493,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         #region IMPLEMENTATION: COMPUTE
 
-        private long ComputeTaskMap(long memPtr)
+        private long ComputeTaskMap(long memPtr, long ignored, long ignored2, 
void* sesPtr)
         {
             using (PlatformMemoryStream stream = 
IgniteManager.Memory.Get(memPtr).GetStream())
             {
-                Task(stream.ReadLong()).Map(stream);
+                var ses = TaskSession(sesPtr);
+                Task(stream.ReadLong()).Map(stream, ses);
 
                 return 0;
             }
@@ -562,15 +563,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             }
         }
 
-        private long ComputeJobExecuteLocal(long jobPtr, long cancel, long 
unused, void* arg)
+        private long ComputeJobExecuteLocal(long jobPtr, long cancel, long 
unused, void* sesPtr)
         {
-            Job(jobPtr).ExecuteLocal(cancel == 1);
+            var ses = TaskSession(sesPtr);
+
+            Job(jobPtr).ExecuteLocal(cancel == 1, ses);
 
             return 0;
         }
 
-        private long ComputeJobExecute(long memPtr)
+        private long ComputeJobExecute(long memPtr, long ignored, long 
ignored2, void* sesPtr)
         {
+            var ses = TaskSession(sesPtr);
             using (PlatformMemoryStream stream = 
IgniteManager.Memory.Get(memPtr).GetStream())
             {
                 var job = Job(stream.ReadLong());
@@ -579,7 +583,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
                 stream.Reset();
 
-                job.ExecuteRemote(stream, cancel);
+                job.ExecuteRemote(stream, cancel, ses);
             }
 
             return 0;
@@ -615,6 +619,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             return _handleRegistry.Get<IComputeTaskHolder>(taskPtr);
         }
 
+        private IComputeTaskSession TaskSession(void* sesPtr)
+        {
+            if (sesPtr == null)
+            {
+                return null;
+            }
+            
+            var sesRef = _jvm.AttachCurrentThread().NewGlobalRef((IntPtr) 
sesPtr);
+            var sesTarget = new PlatformJniTarget(sesRef, _ignite.Marshaller);
+            return new ComputeTaskSession(sesTarget);
+        }
+
         /// <summary>
         /// Get compute job using it's GC handle pointer.
         /// </summary>
@@ -640,7 +656,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
                 stream.Reset();
 
                 var invoker = 
DelegateTypeDescriptor.GetComputeOutFunc(func.GetType());
-                ComputeRunner.ExecuteJobAndWriteResults(_ignite, stream, func, 
invoker);
+                ComputeRunner.ExecuteJobAndWriteResults(_ignite, null, stream, 
func, invoker);
             }
 
             return 0;
@@ -660,7 +676,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
                 stream.Reset();
 
-                ComputeRunner.ExecuteJobAndWriteResults(_ignite, stream, 
action, act =>
+                ComputeRunner.ExecuteJobAndWriteResults(_ignite, null, stream, 
action, act =>
                 {
                     act.Invoke();
                     return null;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Resource/TaskSessionResourceAttribute.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Resource/TaskSessionResourceAttribute.cs
new file mode 100644
index 00000000000..905bb271deb
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Resource/TaskSessionResourceAttribute.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Resource
+{
+    using System;
+    using Compute;
+    using static System.AttributeTargets;
+
+    /// <summary>
+    /// Injects <see cref="IComputeTaskSession"/> into implementations of
+    /// <see cref="IComputeTask{TJobRes,TReduceRes}"/> and <see 
cref="IComputeJob{TRes}"/>.
+    /// </summary>
+    [AttributeUsage(Field | Method | Property)]
+    public sealed class TaskSessionResourceAttribute : Attribute
+    {
+        // No-op.
+    }
+}

Reply via email to