This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3d72ec46b59 IGNITE-19030 Platforms: Add authorization of Compute tasks
(#10595)
3d72ec46b59 is described below
commit 3d72ec46b59a58e85312f73efc2a7c720924ffc0
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu Mar 23 16:36:51 2023 +0300
IGNITE-19030 Platforms: Add authorization of Compute tasks (#10595)
---
...tformJob.java => PlatformSecurityAwareJob.java} | 23 +-
.../processors/platform/PlatformContext.java | 6 +-
.../processors/platform/PlatformContextImpl.java | 8 +-
.../platform/compute/PlatformAbstractFunc.java | 15 +-
.../platform/compute/PlatformAbstractJob.java | 12 +-
.../PlatformBroadcastingMultiClosureTask.java | 2 +-
.../PlatformBroadcastingSingleClosureTask.java | 2 +-
.../platform/compute/PlatformCallable.java | 4 +-
.../platform/compute/PlatformClosureJob.java | 6 +-
.../platform/compute/PlatformCompute.java | 17 +-
.../platform/compute/PlatformFullJob.java | 7 +-
.../platform/compute/PlatformFullTask.java | 9 +-
.../processors/platform/compute/PlatformJob.java | 3 +-
.../platform/compute/PlatformRunnable.java | 5 +-
.../internal/processors/task/GridTaskWorker.java | 5 +
modules/platforms/cpp/core-test/CMakeLists.txt | 1 +
.../cpp/core-test/config/compute-security.xml | 87 +++++
.../cpp/core-test/src/compute_security_test.cpp | 157 ++++++++
.../include/ignite/impl/compute/compute_impl.h | 11 +
.../Apache.Ignite.Core.Tests.DotNetCore.csproj | 1 +
.../Compute/ComputeSecurityPermissionsTest.cs | 398 +++++++++++++++++++++
.../Config/Compute/compute-security.xml | 70 ++++
.../Impl/Compute/Closure/ComputeActionJob.cs | 7 +
.../Impl/Compute/Closure/ComputeFuncJob.cs | 6 +
.../Impl/Compute/Closure/ComputeOutFuncJob.cs | 7 +
.../Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs | 10 +-
.../Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs | 7 +-
.../Apache.Ignite.Core/Impl/Compute/ComputeJob.cs | 10 +-
.../Impl/Compute/ComputeOutFunc.cs | 10 +-
.../dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs | 8 +
30 files changed, 867 insertions(+), 47 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/PlatformSecurityAwareJob.java
similarity index 57%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/PlatformSecurityAwareJob.java
index 66f8adb7d58..563b3b73e3b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/PlatformSecurityAwareJob.java
@@ -15,25 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.platform.compute;
+package org.apache.ignite.internal;
-import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.plugin.security.SecurityPermission;
/**
- * Platform closure job interface.
+ * Represents the base interface for all Platform Compute Jobs that wrap and
execute user code. The execution and
+ * cancellation of tasks marked with this interface will be preceded by
authorization with the specified name and
+ * {@link SecurityPermission#TASK_EXECUTE} / {@link
SecurityPermission#TASK_CANCEL} permission.
*/
-public interface PlatformJob extends ComputeJob {
+public interface PlatformSecurityAwareJob {
/**
- * Gets native pointer to deployed job.
- *
- * @return Pointer.
+ * @return The name of the Platform Compute Job that will be used when
authorizing its start and cancellation.
*/
- public long pointer();
-
- /**
- * Gets native job.
- *
- * @return Native job.
- */
- public Object job();
+ public String name();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index d2d363ecec8..f441d88103a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -234,9 +234,10 @@ public interface PlatformContext {
* @param task Task.
* @param ptr Pointer.
* @param job Native job.
+ * @param jobName Native job name.
* @return job.
*/
- public PlatformJob createJob(Object task, long ptr, @Nullable Object job);
+ public PlatformJob createJob(Object task, long ptr, @Nullable Object job,
String jobName);
/**
* Create closure job.
@@ -244,9 +245,10 @@ public interface PlatformContext {
* @param task Native task.
* @param ptr Pointer.
* @param job Native job.
+ * @param jobName Closure name.
* @return Closure job.
*/
- public PlatformJob createClosureJob(Object task, long ptr, Object job);
+ public PlatformJob createClosureJob(Object task, long ptr, Object job,
String jobName);
/**
* Create cache entry processor.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
index 553d6692d8e..81b5af96376 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
@@ -558,13 +558,13 @@ public class PlatformContextImpl implements
PlatformContext, PartitionsExchangeA
}
/** {@inheritDoc} */
- @Override public PlatformJob createJob(Object task, long ptr, @Nullable
Object job) {
- return new PlatformFullJob(this, (PlatformAbstractTask)task, ptr, job);
+ @Override public PlatformJob createJob(Object task, long ptr, @Nullable
Object job, String jobName) {
+ return new PlatformFullJob(this, (PlatformAbstractTask)task, ptr, job,
jobName);
}
/** {@inheritDoc} */
- @Override public PlatformJob createClosureJob(Object task, long ptr,
Object job) {
- return new PlatformClosureJob((PlatformAbstractTask)task, ptr, job);
+ @Override public PlatformJob createClosureJob(Object task, long ptr,
Object job, String jobName) {
+ return new PlatformClosureJob((PlatformAbstractTask)task, ptr, job,
jobName);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractFunc.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractFunc.java
index f5d6d86083e..6880508d55e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractFunc.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractFunc.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.platform.compute;
import java.io.Serializable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.PlatformSecurityAwareJob;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import
org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import
org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
@@ -33,7 +34,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
* Cleaner alternative to {@link PlatformClosureJob}, uses less wrapping for
the underlying object,
* and a single callback.
*/
-public abstract class PlatformAbstractFunc implements Serializable {
+public abstract class PlatformAbstractFunc implements
PlatformSecurityAwareJob, Serializable {
/** */
private static final long serialVersionUID = 0L;
@@ -48,17 +49,22 @@ public abstract class PlatformAbstractFunc implements
Serializable {
@IgniteInstanceResource
protected transient Ignite ignite;
+ /** Platform function name. */
+ private final String funcName;
+
/**
* Constructor.
*
* @param func Platform func.
* @param ptr Handle for local execution.
+ * @param funcName Platform function name.
*/
- protected PlatformAbstractFunc(Object func, long ptr) {
+ protected PlatformAbstractFunc(Object func, long ptr, String funcName) {
this.ptr = ptr;
assert func != null;
this.func = func;
+ this.funcName = funcName;
}
/**
@@ -93,6 +99,11 @@ public abstract class PlatformAbstractFunc implements
Serializable {
}
}
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return funcName;
+ }
+
/**
* Performs platform callback.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
index 771866eb30c..fbde0d8aa9b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -50,6 +50,9 @@ public abstract class PlatformAbstractJob implements
PlatformJob, Externalizable
/** Job. */
protected Object job;
+ /** Job name. */
+ protected String jobName;
+
/**
* {@link java.io.Externalizable} support.
*/
@@ -63,11 +66,13 @@ public abstract class PlatformAbstractJob implements
PlatformJob, Externalizable
* @param task Parent task.
* @param ptr Pointer.
* @param job Job.
+ * @param jobName Job name.
*/
- protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object
job) {
+ protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object
job, String jobName) {
this.task = task;
this.ptr = ptr;
this.job = job;
+ this.jobName = jobName;
}
/** {@inheritDoc} */
@@ -153,4 +158,9 @@ public abstract class PlatformAbstractJob implements
PlatformJob, Externalizable
@Override public Object job() {
return job;
}
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return jobName;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
index 1c1605beaa4..d1771f7072c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
@@ -69,7 +69,7 @@ public class PlatformBroadcastingMultiClosureTask extends
PlatformAbstractTask {
first = false;
}
else
- map.put(ctx.createClosureJob(this, job.pointer(),
job.job()), node);
+ map.put(ctx.createClosureJob(this, job.pointer(),
job.job(), job.name()), node);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
index 71818b0fef7..613d69ab8f3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
@@ -67,7 +67,7 @@ public class PlatformBroadcastingSingleClosureTask extends
PlatformAbstractTask
first = false;
}
else
- map.put(ctx.createClosureJob(this, job.pointer(),
job.job()), node);
+ map.put(ctx.createClosureJob(this, job.pointer(),
job.job(), job.name()), node);
}
return map;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCallable.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCallable.java
index faae32ea319..edb279dfe50 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCallable.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCallable.java
@@ -34,8 +34,8 @@ public class PlatformCallable extends PlatformAbstractFunc
implements IgniteCall
* @param func Platform func.
* @param ptr Handle for local execution.
*/
- public PlatformCallable(Object func, long ptr) {
- super(func, ptr);
+ public PlatformCallable(Object func, long ptr, String name) {
+ super(func, ptr, name);
}
/** <inheritdoc /> */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
index 674736534f9..3bcb54995bf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -53,8 +53,8 @@ public class PlatformClosureJob extends PlatformAbstractJob {
* @param ptr Job pointer.
* @param job Job.
*/
- public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job)
{
- super(task, ptr, job);
+ public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job,
String jobName) {
+ super(task, ptr, job, jobName);
}
/** {@inheritDoc} */
@@ -105,10 +105,12 @@ public class PlatformClosureJob extends
PlatformAbstractJob {
assert job != null;
out.writeObject(job);
+ out.writeObject(jobName);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
job = in.readObject();
+ jobName = (String)in.readObject();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 1b832e08576..b30ba5ca6b0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -160,8 +160,9 @@ public class PlatformCompute extends PlatformAbstractTarget
{
case OP_EXEC_NATIVE: {
long taskPtr = reader.readLong();
long topVer = reader.readLong();
+ String taskName = reader.readString();
- final PlatformFullTask task = new
PlatformFullTask(platformCtx, platformGrp, taskPtr, topVer);
+ final PlatformFullTask task = new
PlatformFullTask(platformCtx, platformGrp, taskPtr, topVer, taskName);
return executeNative0(task);
}
@@ -186,8 +187,9 @@ public class PlatformCompute extends PlatformAbstractTarget
{
int part = reader.readInt();
Object func = reader.readObjectDetached();
long ptr = reader.readLong();
+ String funcName = reader.readString();
- PlatformCallable callable = new PlatformCallable(func, ptr);
+ PlatformCallable callable = new PlatformCallable(func, ptr,
funcName);
IgniteInternalFuture future =
compute.affinityCallAsync(cacheNames, part, callable);
@@ -199,8 +201,9 @@ public class PlatformCompute extends PlatformAbstractTarget
{
Object key = reader.readObjectDetached();
Object func = reader.readObjectDetached();
long ptr = reader.readLong();
+ String callableName = reader.readString();
- PlatformCallable callable = new PlatformCallable(func, ptr);
+ PlatformCallable callable = new PlatformCallable(func, ptr,
callableName);
IgniteInternalFuture future =
compute.affinityCallAsync(Collections.singletonList(cacheName), key, callable);
@@ -212,8 +215,9 @@ public class PlatformCompute extends PlatformAbstractTarget
{
int part = reader.readInt();
Object func = reader.readObjectDetached();
long ptr = reader.readLong();
+ String runnableName = reader.readString();
- PlatformRunnable runnable = new PlatformRunnable(func, ptr);
+ PlatformRunnable runnable = new PlatformRunnable(func, ptr,
runnableName);
IgniteInternalFuture future =
compute.affinityRunAsync(cacheNames, part, runnable);
@@ -225,8 +229,9 @@ public class PlatformCompute extends PlatformAbstractTarget
{
Object key = reader.readObjectDetached();
Object func = reader.readObjectDetached();
long ptr = reader.readLong();
+ String runnableName = reader.readString();
- PlatformRunnable runnable = new PlatformRunnable(func, ptr);
+ PlatformRunnable runnable = new PlatformRunnable(func, ptr,
runnableName);
IgniteInternalFuture future =
compute.affinityRunAsync(Collections.singleton(cacheName), key, runnable);
@@ -319,7 +324,7 @@ public class PlatformCompute extends PlatformAbstractTarget
{
* @return Closure job.
*/
private PlatformJob nextClosureJob(PlatformAbstractTask task,
BinaryRawReaderEx reader) {
- return platformCtx.createClosureJob(task, reader.readLong(),
reader.readObjectDetached());
+ return platformCtx.createClosureJob(task, reader.readLong(),
reader.readObjectDetached(), reader.readString());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
index 160392d0571..4bf3b2f9d70 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -78,9 +78,10 @@ public class PlatformFullJob extends PlatformAbstractJob {
* @param task Parent task.
* @param ptr Job pointer.
* @param job Job.
+ * @param jobName Job name.
*/
- public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task,
long ptr, Object job) {
- super(task, ptr, job);
+ public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task,
long ptr, Object job, String jobName) {
+ super(task, ptr, job, jobName);
this.ctx = ctx;
}
@@ -196,11 +197,13 @@ public class PlatformFullJob extends PlatformAbstractJob {
assert job != null;
out.writeObject(job);
+ out.writeObject(jobName);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
job = in.readObject();
+ jobName = (String)in.readObject();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
index 59dfd51b7bb..fd799b6b46f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -52,6 +52,9 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
/** Cluster group. */
private final ClusterGroup grp;
+ /** Platform task name. */
+ private final String taskName;
+
/**
* Constructor.
*
@@ -59,12 +62,14 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
* @param grp Cluster group.
* @param taskPtr Pointer to the task in the native platform.
* @param topVer Initial topology version.
+ * @param taskName Task name.
*/
- public PlatformFullTask(PlatformContext ctx, ClusterGroup grp, long
taskPtr, long topVer) {
+ public PlatformFullTask(PlatformContext ctx, ClusterGroup grp, long
taskPtr, long topVer, String taskName) {
super(ctx, taskPtr);
this.grp = grp;
this.topVer = topVer;
+ this.taskName = taskName;
}
/** {@inheritDoc} */
@@ -160,7 +165,7 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
Object nativeJob = reader.readBoolean() ?
reader.readObjectDetached() : null;
- PlatformJob job = ctx.createJob(this, ptr, nativeJob);
+ PlatformJob job = ctx.createJob(this, ptr, nativeJob,
taskName);
UUID jobNodeId = reader.readUuid();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
index 66f8adb7d58..9bda9ddca12 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.processors.platform.compute;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.internal.PlatformSecurityAwareJob;
/**
* Platform closure job interface.
*/
-public interface PlatformJob extends ComputeJob {
+public interface PlatformJob extends ComputeJob, PlatformSecurityAwareJob {
/**
* Gets native pointer to deployed job.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformRunnable.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformRunnable.java
index c389c7cb969..93bd00038b7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformRunnable.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformRunnable.java
@@ -34,9 +34,10 @@ public class PlatformRunnable extends PlatformAbstractFunc
implements IgniteRunn
*
* @param func Platform func.
* @param ptr Handle for local execution.
+ * @param name Platform function name.
*/
- public PlatformRunnable(Object func, long ptr) {
- super(func, ptr);
+ public PlatformRunnable(Object func, long ptr, String name) {
+ super(func, ptr, name);
}
/** <inheritdoc /> */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 5c5f71889a3..94805dcbcbf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.PlatformSecurityAwareJob;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
@@ -1774,6 +1775,8 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
ctx.security().authorize(executable.getClass().getName(),
TASK_EXECUTE);
}
+ else if (executable instanceof PlatformSecurityAwareJob)
+
ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(),
TASK_EXECUTE);
else if (executable instanceof PublicAccessJob)
authorizeAll(ctx.security(),
((PublicAccessJob)executable).requiredPermissions());
else if (opts.isPublicRequest()) {
@@ -1800,6 +1803,8 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
if (!ctx.security().isSystemType(executable.getClass()))
ctx.security().authorize(executable.getClass().getName(),
TASK_CANCEL);
+ else if (executable instanceof PlatformSecurityAwareJob)
+
ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(),
TASK_CANCEL);
else if (!isClosedByInitiator)
ctx.security().authorize(ADMIN_KILL);
}
diff --git a/modules/platforms/cpp/core-test/CMakeLists.txt
b/modules/platforms/cpp/core-test/CMakeLists.txt
index 188716eec2f..658ad8df0f9 100644
--- a/modules/platforms/cpp/core-test/CMakeLists.txt
+++ b/modules/platforms/cpp/core-test/CMakeLists.txt
@@ -39,6 +39,7 @@ set(SOURCES
src/concurrent_test.cpp
src/compute_test.cpp
src/compute_java_test.cpp
+ src/compute_security_test.cpp
src/ignition_test.cpp
src/interop_memory_test.cpp
src/interop_test.cpp
diff --git a/modules/platforms/cpp/core-test/config/compute-security.xml
b/modules/platforms/cpp/core-test/config/compute-security.xml
new file mode 100644
index 00000000000..e156fe4454f
--- /dev/null
+++ b/modules/platforms/cpp/core-test/config/compute-security.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation=
+ "http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ https://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="shared" value="false"/>
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean
class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="default"/>
+ </bean>
+ </list>
+ </property>
+
+ <property name="pluginProviders">
+ <list>
+ <bean
class="org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider">
+ <constructor-arg name="login" value="login" />
+ <constructor-arg name="pwd" value="pwd" />
+ <constructor-arg name="perms">
+ <bean
class="org.apache.ignite.plugin.security.SecurityBasicPermissionSet">
+ <property name="systemPermissions">
+ <util:list
value-type="org.apache.ignite.plugin.security.SecurityPermission">
+ <value>JOIN_AS_SERVER</value>
+ <value>CACHE_CREATE</value>
+ </util:list>
+ </property>
+ <property name="taskPermissions">
+ <util:map map-class="java.util.HashMap"
key-type="java.lang.String" value-type="java.util.Collection">
+ <entry key="AllowedCallable"
value-ref="task-execute-permission"/>
+ <entry key="AllowedRunnable"
value-ref="task-execute-permission"/>
+ </util:map>
+ </property>
+ </bean>
+ </constructor-arg>
+ <constructor-arg name="globalAuth" value="false" />
+ <constructor-arg name="clientData">
+ <list
value-type="org.apache.ignite.internal.processors.security.impl.TestSecurityData"/>
+ </constructor-arg>
+ </bean>
+ </list>
+ </property>
+ </bean>
+
+ <util:list id="task-execute-permission"
value-type="org.apache.ignite.plugin.security.SecurityPermission">
+ <value>TASK_EXECUTE</value>
+ </util:list>
+</beans>
diff --git a/modules/platforms/cpp/core-test/src/compute_security_test.cpp
b/modules/platforms/cpp/core-test/src/compute_security_test.cpp
new file mode 100644
index 00000000000..18257facaf9
--- /dev/null
+++ b/modules/platforms/cpp/core-test/src/compute_security_test.cpp
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/chrono.hpp>
+
+#include <ignite/ignition.h>
+#include <ignite/test_utils.h>
+
+using namespace ignite;
+using namespace ignite::cache;
+using namespace ignite::cluster;
+using namespace ignite::compute;
+using namespace ignite::common::concurrent;
+using namespace ignite::impl;
+using namespace ignite_test;
+
+using namespace boost::unit_test;
+
+/** */
+struct ComputeSecurityTestSuiteFixture {
+ Ignite node;
+
+ ComputeSecurityTestSuiteFixture() : node(StartNode("compute-security.xml",
"test-node")) {
+ // No-op.
+ }
+
+ ~ComputeSecurityTestSuiteFixture()
+ {
+ Ignition::StopAll(true);
+ }
+};
+
+struct AbstractCallable : ComputeFunc<int> {
+ virtual int Call() {
+ return 42;
+ }
+};
+
+struct AllowedCallable : AbstractCallable { };
+struct ForbiddenCallable : AbstractCallable { };
+
+struct AbstractRunnable : ComputeFunc<void> {
+ virtual void Call() {
+ // No-op.
+ }
+};
+
+struct AllowedRunnable : AbstractRunnable { };
+struct ForbiddenRunnable : AbstractRunnable { };
+
+namespace ignite {
+ namespace binary {
+ template<>
+ struct BinaryType<AllowedCallable> :
BinaryTypeDefaultAll<AllowedCallable> {
+ static void GetTypeName(std::string &dst) {
+ dst = "AllowedCallable";
+ }
+
+ static void Write(BinaryWriter&, const AllowedCallable&) {}
+
+ static void Read(BinaryReader&, AllowedCallable&) {}
+ };
+
+ template<>
+ struct BinaryType<ForbiddenCallable> :
BinaryTypeDefaultAll<ForbiddenCallable> {
+ static void GetTypeName(std::string &dst) {
+ dst = "ForbiddenCallable";
+ }
+
+ static void Write(BinaryWriter&, const ForbiddenCallable&) {}
+
+ static void Read(BinaryReader&, ForbiddenCallable&) {}
+ };
+
+ template<>
+ struct BinaryType<AllowedRunnable> :
BinaryTypeDefaultAll<AllowedRunnable> {
+ static void GetTypeName(std::string &dst) {
+ dst = "AllowedRunnable";
+ }
+
+ static void Write(BinaryWriter&, const AllowedRunnable&) {}
+
+ static void Read(BinaryReader&, AllowedRunnable&) {}
+ };
+
+ template<>
+ struct BinaryType<ForbiddenRunnable> :
BinaryTypeDefaultAll<ForbiddenRunnable> {
+ static void GetTypeName(std::string &dst) {
+ dst = "ForbiddenRunnable";
+ }
+
+ static void Write(BinaryWriter&, const ForbiddenRunnable&) {}
+
+ static void Read(BinaryReader&, ForbiddenRunnable&) {}
+ };
+ }
+}
+
+BOOST_FIXTURE_TEST_SUITE(ComputeSecurityTestSuite,
ComputeSecurityTestSuiteFixture)
+ // Currently we cannot validate security exception messages produced by
non "affinity" compute methods
+ // see https://issues.apache.org/jira/browse/IGNITE-19055
+ BOOST_AUTO_TEST_CASE(TestComputeSecurity) {
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECK_EQUAL(42, compute.Call<int>(AllowedCallable()));
+ BOOST_CHECK_THROW(compute.Call<int>(ForbiddenCallable()), IgniteError);
+
+ BOOST_CHECK_EQUAL(42,
compute.CallAsync<int>(AllowedCallable()).GetValue());
+
BOOST_CHECK_THROW(compute.CallAsync<int>(ForbiddenCallable()).GetValue(),
IgniteError);
+
+ BOOST_CHECK_EQUAL(42, compute.AffinityCall<int>("default", 0,
AllowedCallable()));
+ BOOST_CHECK_THROW(compute.AffinityCall<int>("default", 0,
ForbiddenCallable()), IgniteError);
+
+ BOOST_CHECK_EQUAL(42, compute.AffinityCallAsync<int>("default", 0,
AllowedCallable()).GetValue());
+ BOOST_CHECK_THROW(compute.AffinityCallAsync<int>("default", 0,
ForbiddenCallable()).GetValue(), IgniteError);
+
+ compute.Run(AllowedRunnable());
+ BOOST_CHECK_THROW(compute.Run(ForbiddenRunnable()), IgniteError);
+
+ compute.RunAsync(AllowedRunnable()).GetValue();
+ BOOST_CHECK_THROW(compute.RunAsync(ForbiddenRunnable()).GetValue(),
IgniteError);
+
+ compute.AffinityRun("default", 0, AllowedRunnable());
+ BOOST_CHECK_THROW(compute.AffinityRun("default", 0,
ForbiddenRunnable()), IgniteError);
+
+ compute.AffinityRunAsync("default", 0, AllowedRunnable()).GetValue();
+ BOOST_CHECK_THROW(compute.AffinityRunAsync("default", 0,
ForbiddenRunnable()).GetValue(), IgniteError);
+
+ BOOST_CHECK_EQUAL(42,
compute.Broadcast<int>(AllowedCallable()).front());
+ BOOST_CHECK_THROW(compute.Broadcast<int>(ForbiddenCallable()),
IgniteError);
+
+ BOOST_CHECK_EQUAL(42,
compute.BroadcastAsync<int>(AllowedCallable()).GetValue().front());
+
BOOST_CHECK_THROW(compute.BroadcastAsync<int>(ForbiddenCallable()).GetValue(),
IgniteError);
+
+ compute.Broadcast(AllowedRunnable());
+ BOOST_CHECK_THROW(compute.Broadcast(ForbiddenRunnable()), IgniteError);
+
+ compute.BroadcastAsync(AllowedRunnable()).GetValue();
+
BOOST_CHECK_THROW(compute.BroadcastAsync(ForbiddenRunnable()).GetValue(),
IgniteError);
+ }
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git
a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
index bd07ea4d743..1cdf8001af4 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
@@ -477,6 +477,11 @@ namespace ignite
writer.WriteInt64(jobHandle);
writer.WriteObject<F>(func);
+ std::string typeName;
+ ignite::binary::BinaryType<F>::GetTypeName(typeName);
+
+ writer.WriteString(typeName);
+
out.Synchronize();
IgniteError err;
@@ -529,6 +534,12 @@ namespace ignite
writer.WriteObject<K>(key);
writer.WriteObject<F>(func);
writer.WriteInt64(jobHandle);
+
+ std::string typeName;
+ ignite::binary::BinaryType<F>::GetTypeName(typeName);
+
+ writer.WriteString(typeName);
+
writer.WriteInt64(taskHandle);
writer.WriteInt32(TYP_OBJ);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.DotNetCore.csproj
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.DotNetCore.csproj
index ca06706e2e7..e8877c5f3c3 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.DotNetCore.csproj
+++
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.DotNetCore.csproj
@@ -83,6 +83,7 @@
<None Update="Config\Compute\compute-standalone.xml"
CopyToOutputDirectory="PreserveNewest" />
<None Update="Config\Compute\compute-grid2.xml"
CopyToOutputDirectory="PreserveNewest" />
<None Update="Config\Compute\compute-grid3.xml"
CopyToOutputDirectory="PreserveNewest" />
+ <None Update="Config\Compute\compute-security.xml"
CopyToOutputDirectory="PreserveNewest" />
<None Update="Config\Dynamic\dynamic-data-no-cfg.xml"
CopyToOutputDirectory="PreserveNewest" />
<None Update="Config\Dynamic\dynamic-client.xml"
CopyToOutputDirectory="PreserveNewest" />
<None Update="Config\Dynamic\dynamic-data.xml"
CopyToOutputDirectory="PreserveNewest" />
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeSecurityPermissionsTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeSecurityPermissionsTest.cs
new file mode 100644
index 00000000000..7bbd3fdbba3
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeSecurityPermissionsTest.cs
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Compute
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Compute;
+ using NUnit.Framework;
+ using static AbstractTask.Job;
+ using static ComputeSecurityPermissionsTest;
+
+ /// <summary>
+ /// Tests authorization of DotNet native Compute tasks execution.
+ /// </summary>
+ public class ComputeSecurityPermissionsTest
+ {
+ private const string CacheName = "DEFAULT_CACHE_NAME";
+
+ public static int ExecutedJobCounter;
+
+ public static int CancelledJobCounter;
+
+ private IIgnite _grid;
+
+ [TestFixtureSetUp]
+ public void FixtureSetUp()
+ {
+ var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ SpringConfigUrl = @"Config/Compute/compute-security.xml",
+ };
+
+ _grid = Ignition.Start(cfg);
+
+ _grid.CreateCache<object, object>(CacheName);
+ }
+
+ [TestFixtureTearDown]
+ public void FixtureTearDown()
+ {
+ Ignition.StopAll(true);
+ }
+
+ [Test]
+ public void TestComputeSecurityExecutePermission()
+ {
+ CheckTask((task, ct) => _grid.GetCompute().Execute(task, null));
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync(task,
null).GetResult());
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync(task,
null, ct).GetResult());
+
+ CheckTask((task, ct) => _grid.GetCompute().Execute(task));
+ CheckTask((task, ct) =>
_grid.GetCompute().ExecuteAsync(task).GetResult());
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync(task,
ct).GetResult());
+
+ CheckTask((task, ct) => _grid.GetCompute().Execute<object, int,
int>(task.GetType(), null));
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync<object,
int, int>(task.GetType(), null).GetResult());
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync<object,
int, int>(task.GetType(), null, ct).GetResult());
+
+ CheckTask((task, ct) => _grid.GetCompute().Execute<int,
int>(task.GetType()));
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync<int,
int>(task.GetType()).GetResult());
+ CheckTask((task, ct) => _grid.GetCompute().ExecuteAsync<int,
int>(task.GetType(), ct).GetResult());
+
+ CheckCallable((func, ct) => _grid.GetCompute().Call(func));
+ CheckCallable((func, ct) =>
_grid.GetCompute().CallAsync(func).GetResult());
+ CheckCallable((func, ct) => _grid.GetCompute().CallAsync(func,
ct).GetResult());
+
+ CheckCallable((func, ct) =>
_grid.GetCompute().AffinityCall(CacheName, 0, func));
+ CheckCallable((func, ct) =>
_grid.GetCompute().AffinityCallAsync(CacheName, 0, func).GetResult());
+ CheckCallable((func, ct) =>
_grid.GetCompute().AffinityCallAsync(CacheName, 0, func, ct).GetResult());
+
+ CheckCallable((func, ct) => _grid.GetCompute().AffinityCall(new[]
{ CacheName }, 0, func));
+ CheckCallable((func, ct) =>
_grid.GetCompute().AffinityCallAsync(new[] { CacheName }, 0, func).GetResult());
+ CheckCallable((func, ct) =>
_grid.GetCompute().AffinityCallAsync(new[] { CacheName }, 0, func,
ct).GetResult());
+
+ CheckCallables((callables, ct) =>
_grid.GetCompute().Call(callables, new TestReducer()));
+ CheckCallables((callables, ct) =>
_grid.GetCompute().CallAsync(callables, new TestReducer()).GetResult());
+ CheckCallables((callables, ct) =>
_grid.GetCompute().CallAsync(callables, new TestReducer(), ct).GetResult());
+
+ CheckCallables((callables, ct) =>
_grid.GetCompute().Call(callables));
+ CheckCallables((callables, ct) =>
_grid.GetCompute().CallAsync(callables).GetResult());
+ CheckCallables((callables, ct) =>
_grid.GetCompute().CallAsync(callables, ct).GetResult());
+
+ CheckCallable((callable, ct) =>
_grid.GetCompute().Broadcast(callable));
+ CheckCallable((callable, ct) =>
_grid.GetCompute().BroadcastAsync(callable).GetResult());
+ CheckCallable((callable, ct) =>
_grid.GetCompute().BroadcastAsync(callable, ct).GetResult());
+
+ CheckFunction((func, ct) => _grid.GetCompute().Broadcast(func, 0));
+ CheckFunction((func, ct) =>
_grid.GetCompute().BroadcastAsync(func, 0).GetResult());
+ CheckFunction((func, ct) =>
_grid.GetCompute().BroadcastAsync(func, 0, ct).GetResult());
+
+ CheckAction((action, ct) => _grid.GetCompute().Broadcast(action));
+ CheckAction((action, ct) =>
_grid.GetCompute().BroadcastAsync(action).Wait());
+ CheckAction((action, ct) =>
_grid.GetCompute().BroadcastAsync(action, ct).Wait());
+
+ CheckAction((action, ct) => _grid.GetCompute().Run(action));
+ CheckAction((action, ct) =>
_grid.GetCompute().RunAsync(action).Wait());
+ CheckAction((action, ct) => _grid.GetCompute().RunAsync(action,
ct).Wait());
+
+ CheckAction((action, ct) =>
_grid.GetCompute().AffinityRun(CacheName, 0, action));
+ CheckAction((action, ct) =>
_grid.GetCompute().AffinityRunAsync(CacheName, 0, action).Wait());
+ CheckAction((action, ct) =>
_grid.GetCompute().AffinityRunAsync(CacheName, 0, action, ct).Wait());
+
+ CheckAction((action, ct) => _grid.GetCompute().AffinityRun(new[] {
CacheName }, 0, action));
+ CheckAction((action, ct) =>
_grid.GetCompute().AffinityRunAsync(new[] { CacheName }, 0, action).Wait());
+ CheckAction((action, ct) =>
_grid.GetCompute().AffinityRunAsync(new[] { CacheName }, 0, action, ct).Wait());
+
+ CheckActions((actions, ct) => _grid.GetCompute().Run(actions));
+ CheckActions((actions, ct) =>
_grid.GetCompute().RunAsync(actions).Wait());
+ CheckActions((actions, ct) => _grid.GetCompute().RunAsync(actions,
ct).Wait());
+
+ CheckFunction((func, ct) => _grid.GetCompute().Apply(func, 0));
+ CheckFunction((func, ct) => _grid.GetCompute().ApplyAsync(func,
0).GetResult());
+ CheckFunction((func, ct) => _grid.GetCompute().ApplyAsync(func, 0,
ct).GetResult());
+
+ CheckFunction((func, ct) => _grid.GetCompute().Apply(func, new[] {
0 }));
+ CheckFunction((func, ct) => _grid.GetCompute().ApplyAsync(func,
new[] { 0 }).GetResult());
+ CheckFunction((func, ct) => _grid.GetCompute().ApplyAsync(func,
new[] { 0 }, ct).GetResult());
+
+ CheckFunction((func, ct) => _grid.GetCompute().Apply(func, new[] {
0 }, new TestReducer()));
+ CheckFunction((func, ct) => _grid.GetCompute().ApplyAsync(func,
new[] { 0 }, new TestReducer()).GetResult());
+ CheckFunction((func, ct) => _grid.GetCompute().ApplyAsync(func,
new[] { 0 }, new TestReducer(), ct).GetResult());
+ }
+
+ [Test]
+ public void TestComputeTaskSecurityCancelPermission()
+ {
+ CheckTaskCancel((task, ct) =>
_grid.GetCompute().ExecuteAsync(task, null, ct));
+ CheckTaskCancel((task, ct) =>
_grid.GetCompute().ExecuteAsync(task, ct));
+ CheckTaskCancel((task, ct) =>
_grid.GetCompute().ExecuteAsync<object, int, int>(task.GetType(), null, ct));
+ CheckTaskCancel((task, ct) => _grid.GetCompute().ExecuteAsync<int,
int>(task.GetType(), ct));
+ }
+
+ private void CheckFunction(Action<IComputeFunc<int, int>,
CancellationToken> executor)
+ {
+ CheckExecutionSucceeded(token => executor(new
ExecuteAllowedFunction(), token));
+ CheckExecutionFailed(token => executor(new
ExecuteForbiddenFunction(), token));
+ }
+
+ private void CheckCallable(Action<IComputeFunc<int>,
CancellationToken> executor)
+ {
+ CheckExecutionSucceeded(token => executor(new
ExecuteAllowedCallable(), token));
+ CheckExecutionFailed(token => executor(new
ExecuteForbiddenCallable(), token));
+ }
+
+ private void CheckCallables(Action<IEnumerable<IComputeFunc<int>>,
CancellationToken> executor)
+ {
+ CheckExecutionSucceeded(token => executor(new[] { new
ExecuteAllowedCallable() }, token));
+ CheckExecutionFailed(token =>
+ executor(new IComputeFunc<int>[] { new
ExecuteAllowedCallable(), new ExecuteForbiddenCallable() }, token));
+ }
+
+ private void CheckAction(Action<IComputeAction, CancellationToken>
executor)
+ {
+ CheckExecutionSucceeded(token => executor(new
ExecuteAllowedAction(), token));
+ CheckExecutionFailed(token => executor(new
ExecuteForbiddenAction(), token));
+ }
+
+ private void CheckActions(Action<IEnumerable<IComputeAction>,
CancellationToken> executor)
+ {
+ CheckExecutionSucceeded(token => executor(new[] { new
ExecuteAllowedAction() }, token));
+ CheckExecutionFailed(token =>
+ executor(new IComputeAction[] { new ExecuteAllowedAction(),
new ExecuteForbiddenAction() }, token));
+ }
+
+ private void CheckTask(Action<IComputeTask<int, int>,
CancellationToken> executor)
+ {
+ CheckExecutionSucceeded(token => executor(new
ExecuteAllowedTask(), token));
+ CheckExecutionFailed(token => executor(new ExecuteForbiddenTask(),
token));
+ }
+
+ private void CheckTaskCancel(Func<IComputeTask<int, int>,
CancellationToken, Task<int>> executor)
+ {
+ CheckTaskCancelSucceeded(executor);
+ CheckTaskCancelFailed(executor);
+ }
+
+ private void CheckTaskCancelFailed(Func<IComputeTask<int, int>,
CancellationToken, Task<int>> executor)
+ {
+ CancelledJobCounter = 0;
+ ExecutedJobCounter = 0;
+
+ JobStartedLatch = new CountdownEvent(1);
+ JobUnblockedLatch = new CountdownEvent(1);
+
+ using var cts = new CancellationTokenSource();
+
+ var fut = executor.Invoke(new ExecuteAllowedTask(), cts.Token);
+
+ JobStartedLatch.Wait(5000);
+
+ AssertAuthorizationException(() => cts.Cancel());
+
+ Assert.False(fut.IsCanceled);
+
+ JobUnblockedLatch.Signal();
+
+ Assert.AreEqual(0, CancelledJobCounter);
+ TestUtils.WaitForTrueCondition(() => 1 == ExecutedJobCounter);
+ }
+
+ private void CheckTaskCancelSucceeded(Func<IComputeTask<int, int>,
CancellationToken, Task<int>> executor)
+ {
+ CancelledJobCounter = 0;
+ ExecutedJobCounter = 0;
+
+ JobStartedLatch = new CountdownEvent(1);
+ JobUnblockedLatch = new CountdownEvent(1);
+
+ using var cts = new CancellationTokenSource();
+
+ var fut = executor.Invoke(new ExecuteCancelAllowedTask(),
cts.Token);
+
+ JobStartedLatch.Wait(5000);
+
+ cts.Cancel();
+
+ TestUtils.WaitForTrueCondition(() => fut.IsCanceled);
+
+ JobUnblockedLatch.Signal();
+
+ Assert.AreEqual(1, CancelledJobCounter);
+ Assert.AreEqual(0, ExecutedJobCounter);
+ }
+
+ private void CheckExecutionSucceeded(Action<CancellationToken> action)
+ {
+ ExecutedJobCounter = 0;
+
+ using var cts = new CancellationTokenSource();
+
+ action(cts.Token);
+
+ Assert.AreEqual(1, ExecutedJobCounter);
+ }
+
+ private void CheckExecutionFailed(Action<CancellationToken> action)
+ {
+ ExecutedJobCounter = 0;
+
+ using var cts = new CancellationTokenSource();
+
+ var token = cts.Token;
+
+ AssertAuthorizationException(() => action(token));
+
+ Assert.AreEqual(0, ExecutedJobCounter);
+ }
+
+ private void AssertAuthorizationException(TestDelegate action)
+ {
+ var ex = Assert.Catch(action) ;
+
+ Assert.NotNull(ex);
+ StringAssert.Contains("Authorization failed",
ex.GetBaseException().Message);
+ }
+ }
+
+ public class TestReducer : IComputeReducer<int, object>
+ {
+ /** <inheritdoc /> */
+ public bool Collect(int res)
+ {
+ return true;
+ }
+
+ /** <inheritdoc /> */
+ public object Reduce()
+ {
+ return null;
+ }
+ }
+
+ public class ExecuteAllowedAction : AbstractAction { }
+
+ public class ExecuteForbiddenAction : AbstractAction { }
+
+ public abstract class AbstractAction : IComputeAction
+ {
+ /** <inheritDoc /> */
+ public void Invoke()
+ {
+ Interlocked.Increment(ref ExecutedJobCounter);
+ }
+ }
+
+ public class ExecuteAllowedFunction : AbstractFunction { }
+
+ public class ExecuteForbiddenFunction : AbstractFunction { }
+
+ public abstract class AbstractFunction : IComputeFunc<int, int>
+ {
+ /** <inheritDoc /> */
+ public int Invoke(int arg)
+ {
+ Interlocked.Increment(ref ExecutedJobCounter);
+
+ return 42;
+ }
+ }
+
+ public class ExecuteAllowedCallable : AbstractCallable { }
+
+ public class ExecuteForbiddenCallable : AbstractCallable { }
+
+ public abstract class AbstractCallable : IComputeFunc<int>
+ {
+ public int Invoke()
+ {
+ Interlocked.Increment(ref ExecutedJobCounter);
+
+ return 42;
+ }
+ }
+
+ public class ExecuteAllowedTask : AbstractTask { }
+
+ public class ExecuteCancelAllowedTask : AbstractTask { }
+
+ public class ExecuteForbiddenTask : AbstractTask { }
+
+ [Serializable]
+ public abstract class AbstractTask : IComputeTask<int, int>
+ {
+ public IDictionary<IComputeJob<int>, IClusterNode>
Map(IList<IClusterNode> subgrid, object arg)
+ {
+ return subgrid.ToDictionary(x => (IComputeJob<int>)new Job(), x =>
x);
+ }
+
+ public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res,
IList<IComputeJobResult<int>> rcvd)
+ {
+ return ComputeJobResultPolicy.Wait;
+ }
+
+ public int Reduce(IList<IComputeJobResult<int>> results)
+ {
+ foreach (var res in results)
+ {
+ Exception err = res.Exception;
+
+ if (err != null)
+ {
+ throw err;
+ }
+ }
+
+ return 42;
+ }
+
+ [Serializable]
+ public class Job : IComputeJob<int>
+ {
+ public static CountdownEvent JobStartedLatch;
+
+ public static CountdownEvent JobUnblockedLatch;
+
+ private bool isCancelled;
+
+ /** <inheritdoc /> */
+ public int Execute()
+ {
+ JobStartedLatch?.Signal();
+ JobUnblockedLatch?.Wait(5000);
+
+ if (!isCancelled)
+ Interlocked.Increment(ref ExecutedJobCounter);
+
+ return 42;
+ }
+
+ /** <inheritdoc /> */
+ public void Cancel()
+ {
+ isCancelled = true;
+
+ Interlocked.Increment(ref CancelledJobCounter);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-security.xml
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-security.xml
new file mode 100644
index 00000000000..164ffde3379
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-security.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation=
+ "http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ https://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="pluginProviders">
+ <list>
+ <bean
class="org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider">
+ <constructor-arg name="login" value="login" />
+ <constructor-arg name="pwd" value="pwd" />
+ <constructor-arg name="perms">
+ <bean
class="org.apache.ignite.plugin.security.SecurityBasicPermissionSet">
+ <property name="systemPermissions">
+ <util:list
value-type="org.apache.ignite.plugin.security.SecurityPermission">
+ <value>JOIN_AS_SERVER</value>
+ <value>CACHE_CREATE</value>
+ </util:list>
+ </property>
+ <property name="taskPermissions">
+ <util:map map-class="java.util.HashMap"
key-type="java.lang.String" value-type="java.util.Collection">
+ <entry
key="Apache.Ignite.Core.Tests.Compute.ExecuteAllowedTask"
value-ref="task-execute-permission"/>
+ <entry
key="Apache.Ignite.Core.Tests.Compute.ExecuteAllowedFunction"
value-ref="task-execute-permission"/>
+ <entry
key="Apache.Ignite.Core.Tests.Compute.ExecuteAllowedCallable"
value-ref="task-execute-permission"/>
+ <entry
key="Apache.Ignite.Core.Tests.Compute.ExecuteAllowedAction"
value-ref="task-execute-permission"/>
+ <entry
key="Apache.Ignite.Core.Tests.Compute.ExecuteCancelAllowedTask"
value-ref="task-execute-cancel-permission"/>
+ </util:map>
+ </property>
+ </bean>
+ </constructor-arg>
+ <constructor-arg name="globalAuth" value="false" />
+ <constructor-arg name="clientData">
+ <list
value-type="org.apache.ignite.internal.processors.security.impl.TestSecurityData"/>
+ </constructor-arg>
+ </bean>
+ </list>
+ </property>
+ </bean>
+
+ <util:list id="task-execute-permission"
value-type="org.apache.ignite.plugin.security.SecurityPermission">
+ <value>TASK_EXECUTE</value>
+ </util:list>
+
+ <util:list id="task-execute-cancel-permission"
value-type="org.apache.ignite.plugin.security.SecurityPermission">
+ <value>TASK_EXECUTE</value>
+ <value>TASK_CANCEL</value>
+ </util:list>
+</beans>
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
index e44018bf49e..1dae0f8d87c 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Compute.Closure
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Deployment;
using Apache.Ignite.Core.Impl.Resource;
+ using static IgniteUtils;
/// <summary>
/// System job which wraps over <c>Action</c>.
@@ -77,5 +78,11 @@ namespace Apache.Ignite.Core.Impl.Compute.Closure
{
_action = (IComputeAction) reader.ReadObject<object>();
}
+
+ /** <inheritDoc /> */
+ public string GetName()
+ {
+ return GetComputeExecutableName(_action);
+ }
}
}
\ No newline at end of file
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
index 3c7cec86cac..57f277a8a69 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
@@ -81,5 +81,11 @@ namespace Apache.Ignite.Core.Impl.Compute.Closure
_clo = reader.ReadObject<IComputeFunc>();
_arg = reader.ReadObject<object>();
}
+
+ /** <inheritDoc /> */
+ public string GetName()
+ {
+ return _clo.GetName();
+ }
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
index af03e3d29d7..17827858dd2 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Impl.Compute.Closure
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Resource;
+ using static IgniteUtils;
/// <summary>
/// System job which wraps over <c>Func</c>.
@@ -72,5 +73,11 @@ namespace Apache.Ignite.Core.Impl.Compute.Closure
{
_clo = reader.ReadObject<IComputeOutFunc>();
}
+
+ /** <inheritDoc /> */
+ public string GetName()
+ {
+ return _clo.GetName();
+ }
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
index 44f2880e3e9..2fa0dc86b25 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
@@ -26,13 +26,15 @@ namespace Apache.Ignite.Core.Impl.Compute
using Apache.Ignite.Core.Impl.Deployment;
using Apache.Ignite.Core.Impl.Resource;
using Apache.Ignite.Core.Resource;
+ using static IgniteUtils;
/// <summary>
/// Non-generic version of IComputeFunc{T}.
/// </summary>
internal interface IComputeFunc : IComputeFunc<object, object>
{
- // No-op
+ /// <returns>Name of the wrapped function.</returns>
+ string GetName();
}
/// <summary>
@@ -103,6 +105,12 @@ namespace Apache.Ignite.Core.Impl.Compute
// Propagate injection
ResourceProcessor.Inject(_func, (Ignite) ignite);
}
+
+ /** <inheritDoc /> */
+ public string GetName()
+ {
+ return GetComputeExecutableName(_func);
+ }
}
/// <summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index 153775c4f1b..af2f793000a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -36,6 +36,7 @@ namespace Apache.Ignite.Core.Impl.Compute
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Compute.Closure;
using Apache.Ignite.Core.Impl.Deployment;
+ using static IgniteUtils;
/// <summary>
/// Compute implementation.
@@ -231,10 +232,11 @@ namespace Apache.Ignite.Core.Impl.Compute
long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder);
- var futTarget = DoOutOpObject(OpExecNative, (IBinaryStream s) =>
+ var futTarget = DoOutOpObject(OpExecNative, s =>
{
s.WriteLong(ptr);
s.WriteLong(_prj.TopologyVersion);
+ s.WriteString(GetComputeExecutableName(task));
});
var future = holder.Future;
@@ -562,6 +564,7 @@ namespace Apache.Ignite.Core.Impl.Compute
w.WriteWithPeerDeployment(func);
w.WriteLong(handle);
+ w.WriteString(GetComputeExecutableName(func));
});
fut.Task.ContWith(_ => handleRegistry.Release(handle),
TaskContinuationOptions.ExecuteSynchronously);
@@ -704,6 +707,8 @@ namespace Apache.Ignite.Core.Impl.Compute
throw;
}
+
+ writer.WriteString(job.GetName());
return jobHandle;
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
index 57e234c3752..c94ebcdad36 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
@@ -26,13 +26,15 @@ namespace Apache.Ignite.Core.Impl.Compute
using Apache.Ignite.Core.Impl.Deployment;
using Apache.Ignite.Core.Impl.Resource;
using Apache.Ignite.Core.Resource;
+ using static IgniteUtils;
/// <summary>
/// Non-generic version of IComputeJob{T}.
/// </summary>
internal interface IComputeJob : IComputeJob<object>
{
- // No-op.
+ /// <returns>Name of the wrapped job.</returns>
+ string GetName();
}
/// <summary>
@@ -103,6 +105,12 @@ namespace Apache.Ignite.Core.Impl.Compute
throw;
}
}
+
+ /** <inheritDoc /> */
+ public string GetName()
+ {
+ return GetComputeExecutableName(_job);
+ }
/** <inheritDoc /> */
public void WriteBinary(IBinaryWriter writer)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
index 3d592459207..f6d9168df14 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
@@ -27,13 +27,15 @@ namespace Apache.Ignite.Core.Impl.Compute
using Apache.Ignite.Core.Impl.Deployment;
using Apache.Ignite.Core.Impl.Resource;
using Apache.Ignite.Core.Resource;
+ using static IgniteUtils;
/// <summary>
/// Non-generic version of IComputeFunc{T}.
/// </summary>
internal interface IComputeOutFunc : IComputeFunc<object>
{
- // No-op.
+ /// <returns>Name of the wrapped function.</returns>
+ string GetName();
}
/// <summary>
@@ -106,6 +108,12 @@ namespace Apache.Ignite.Core.Impl.Compute
// Propagate injection
ResourceProcessor.Inject(_func, (Ignite)ignite);
}
+
+ /** <inheritDoc /> */
+ public string GetName()
+ {
+ return GetComputeExecutableName(_func);
+ }
}
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
index 15a35a39815..bf4db733138 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
@@ -28,6 +28,7 @@ namespace Apache.Ignite.Core.Impl
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Compute;
using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
/// <summary>
@@ -231,5 +232,12 @@ namespace Apache.Ignite.Core.Impl
return res & ~platformCache;
}
+
+ /// <param name="obj">Compute executable.</param>
+ /// <returns>FQN of the specified compute executable.</returns>
+ public static string GetComputeExecutableName(object obj)
+ {
+ return obj.GetType().FullName;
+ }
}
}