This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6ab4f7009cf IGNITE-26608 Added a unified mechanism for propagating
Operation Context Attributes (#12429)
6ab4f7009cf is described below
commit 6ab4f7009cfd37b18226e6132fe0dcfa9f0802f2
Author: Mikhail Petrov <[email protected]>
AuthorDate: Tue Feb 24 11:52:43 2026 +0300
IGNITE-26608 Added a unified mechanism for propagating Operation Context
Attributes (#12429)
---
.../integration/AbstractBasicIntegrationTest.java | 4 +-
.../ignite/common/ComputeTaskPermissionsTest.java | 6 +-
.../internal/thread/context/OperationContext.java | 370 ++++++++++++
.../thread/context/OperationContextAttribute.java | 109 ++++
.../thread/context/OperationContextSnapshot.java} | 38 +-
.../ignite/internal/thread/context/Scope.java | 44 ++
.../concurrent/OperationContextAwareExecutor.java | 49 ++
.../function/OperationContextAwareCallable.java | 72 +++
.../function/OperationContextAwareRunnable.java | 59 ++
.../function/OperationContextAwareWrapper.java | 61 ++
.../ignite/internal/IgniteSchedulerImpl.java | 6 +-
.../ignite/internal/SecurityAwareBiPredicate.java | 4 +-
.../ignite/internal/SecurityAwarePredicate.java | 4 +-
.../consistency/ConsistencyRepairTask.java | 4 +-
.../managers/communication/GridIoManager.java | 6 +-
.../managers/discovery/GridDiscoveryManager.java | 8 +-
.../processors/cache/GridCacheAdapter.java | 4 +-
.../cache/GridCachePartitionExchangeManager.java | 6 +-
.../cache/ValidationOnNodeJoinUtils.java | 4 +-
.../query/continuous/SecurityAwareFilter.java | 4 +-
.../SecurityAwareTransformerFactory.java | 4 +-
.../internal/processors/job/GridJobWorker.java | 6 +-
.../processors/odbc/ClientListenerNioListener.java | 6 +-
.../internal/processors/pool/PoolProcessor.java | 19 +-
.../processors/rest/GridRestProcessor.java | 4 +-
.../processors/security/IgniteSecurity.java | 15 +-
.../security/IgniteSecurityProcessor.java | 42 +-
.../security/NoOpIgniteSecurityProcessor.java | 16 +-
.../processors/security/SecurityUtils.java | 5 +-
.../security/thread/SecurityAwareCallable.java | 76 ---
.../security/thread/SecurityAwareRunnable.java | 62 --
.../processors/service/IgniteServiceProcessor.java | 4 +-
.../pool/OperationContextAwareIoPool.java} | 32 +-
.../OperationContextAwareStripedExecutor.java} | 43 +-
...tionContextAwareStripedThreadPoolExecutor.java} | 28 +-
.../OperationContextAwareThreadPoolExecutor.java} | 47 +-
.../AuthenticationProcessorSelfTest.java | 8 +-
.../context/OperationContextAttributesTest.java | 635 +++++++++++++++++++++
.../ignite/testsuites/SecurityTestSuite.java | 2 +
.../authentication/SqlUserCommandSelfTest.java | 4 +-
40 files changed, 1547 insertions(+), 373 deletions(-)
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 0c23f2146c0..7e64f373575 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -44,8 +44,8 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.bounds.Search
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -236,7 +236,7 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
protected List<List<?>> sqlAsRoot(IgniteEx ignite, String sql) throws
Exception {
SecurityContext secCtx = authenticate(grid(0), DFAULT_USER_NAME,
"ignite");
- try (OperationSecurityContext ignored =
ignite.context().security().withContext(secCtx)) {
+ try (Scope ignored = ignite.context().security().withContext(secCtx)) {
return sql(ignite, sql);
}
}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
index 02321a705a2..7634b435645 100644
---
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
@@ -54,12 +54,12 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
import
org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.PublicAccessJob;
import org.apache.ignite.internal.processors.security.SecurityContext;
import
org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest;
import org.apache.ignite.internal.processors.security.impl.TestSecurityData;
import
org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.lang.RunnableX;
import
org.apache.ignite.internal.util.lang.gridfunc.AtomicIntegerFactoryCallable;
@@ -476,7 +476,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
SecurityContext initiatorSecCtx =
securityContext("no-permissions-login-0");
SupplierX<Future<?>> starter = () -> {
- try (OperationSecurityContext ignored1 =
grid(0).context().security().withContext(initiatorSecCtx)) {
+ try (Scope ignored1 =
grid(0).context().security().withContext(initiatorSecCtx)) {
return new TestFutureAdapter<>(
grid(0).context().closure().runAsync(
BROADCAST,
@@ -525,7 +525,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
try (
- OperationSecurityContext ignored = initiator == null
+ Scope ignored = initiator == null
? null
: grid(0).context().security().withContext(initiator)
) {
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
new file mode 100644
index 00000000000..6953d8b8538
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import
org.apache.ignite.internal.thread.context.concurrent.OperationContextAwareExecutor;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.thread.context.Scope.NOOP_SCOPE;
+
+/**
+ * Represents a storage of {@link OperationContextAttribute}s and their
corresponding values bound to the JVM thread.
+ * The state of {@link OperationContext} is determined by a sequence of {@link
Update}s applied to it. Each Update
+ * stores the updated or newly added {@link OperationContextAttribute} values
and link to the previous Update.
+ * <pre>
+ * +-----------+ +-----------+
+ * | | | A1 -> V2 |
+ * null <--| A1 -> V1 |<--| |
+ * | | | A2 -> V3 |
+ * +-----------+ +-----------+
+ *</pre>
+ * {@link OperationContext} Updates can be undone in the same order they were
applied by closing the {@link Scope}
+ * associated with each update (see {@link #set(OperationContextAttribute,
Object)} and related methods).
+ *<p>
+ * {@link OperationContext} bound to one JVM thread can be saved and restored
in another thread using the snapshot
+ * mechanism (see {@link #createSnapshot()} and {@link
#restoreSnapshot(OperationContextSnapshot) methods}). This
+ * provides basic functionality for implementing asynchronous executors that
automatically propagate
+ * {@link OperationContext} data between JVM threads.
+ *</p>
+ *
+ * @see Scope
+ * @see OperationContextSnapshot
+ * @see OperationContextAwareCallable
+ * @see OperationContextAwareRunnable
+ * @see OperationContextAwareExecutor
+ */
+public class OperationContext {
+ /** */
+ private static final ThreadLocal<OperationContext> INSTANCE =
ThreadLocal.withInitial(OperationContext::new);
+
+ /**
+ * Sequence of updated applied to the {@link OperationContext}. Each
update holds a link to the previous Update,
+ * so we store only the reference to the last one.
+ */
+ @Nullable private Update lastUpd;
+
+ /** */
+ private OperationContext() {
+ // No-op.
+ }
+
+ /**
+ * Retrieves value associated with specified attribute by accessing {@link
OperationContext} bound to the thread
+ * this method is called from. If no value is explicitly associated with
specified attribute,
+ * {@link OperationContextAttribute#initialValue()} is returned.
+ *
+ * @param attr Context Attribute.
+ * @return Context Attribute Value.
+ */
+ @Nullable public static <T> T get(OperationContextAttribute<T> attr) {
+ return INSTANCE.get().getInternal(attr);
+ }
+
+ /**
+ * Updates the value of the specified attribute for the {@link
OperationContext} bound to the thread this method
+ * is called from.
+ *
+ * @param attr Context Attribute.
+ * @return Scope instance that, when closed, undoes the applied update. It
is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and
avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned
Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static <T> Scope set(OperationContextAttribute<T> attr, T val) {
+ OperationContext ctx = INSTANCE.get();
+
+ return ctx.getInternal(attr) == val ? NOOP_SCOPE :
ctx.applyAttributeUpdates(new AttributeValueHolder<>(attr, val));
+ }
+
+ /**
+ * Updates the values of the specified attributes for the {@link
OperationContext} bound to the thread this method
+ * is called from.
+ *
+ * @param attr1 First Context Attribute.
+ * @param val1 Values associated with first Context Attribute.
+ * @param attr2 Second Context Attribute.
+ * @param val2 Values associated with second Context Attribute.
+ * @return Scope instance that, when closed, undoes the applied update. It
is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and
avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned
Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static <T1, T2> Scope set(
+ OperationContextAttribute<T1> attr1, T1 val1,
+ OperationContextAttribute<T2> attr2, T2 val2
+ ) {
+ return ContextUpdater.create().set(attr1, val1).set(attr2,
val2).apply();
+ }
+
+ /**
+ * Updates the values of the specified attributes for the {@link
OperationContext} bound to the thread this method
+ * is called from.
+ *
+ * @param attr1 First Context Attribute.
+ * @param val1 Values associated with first Context Attribute.
+ * @param attr2 Second Context Attribute.
+ * @param val2 Values associated with second Context Attribute.
+ * @param attr3 Third Context Attribute.
+ * @param val3 Values associated with third Context Attribute.
+ * @return Scope instance that, when closed, undoes the applied update. It
is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and
avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned
Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static <T1, T2, T3> Scope set(
+ OperationContextAttribute<T1> attr1, T1 val1,
+ OperationContextAttribute<T2> attr2, T2 val2,
+ OperationContextAttribute<T3> attr3, T3 val3
+ ) {
+ return ContextUpdater.create().set(attr1, val1).set(attr2,
val2).set(attr3, val3).apply();
+ }
+
+ /**
+ * Creates Snapshot of all attributes and their corresponding values
stored in the {@link OperationContext} bound
+ * the thread this method is called from.
+ *
+ * @return Context Snapshot.
+ */
+ public static OperationContextSnapshot createSnapshot() {
+ return INSTANCE.get().createSnapshotInternal();
+ }
+
+ /**
+ * Restores values of all attributes for {@link OperationContext} bound to
the thread this method is called from.
+ *
+ * @param snp Context Snapshot.
+ * @return Scope instance that, when closed, undoes the applied operation.
It is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and
avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned
Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static Scope restoreSnapshot(OperationContextSnapshot snp) {
+ return INSTANCE.get().restoreSnapshotInternal(snp);
+ }
+
+ /**
+ * Retrieves value for the specified attribute from the current {@link
OperationContext}. If no value is explicitly
+ * associated with specified attribute, {@link
OperationContextAttribute#initialValue()} is returned.
+ */
+ @Nullable private <T> T getInternal(OperationContextAttribute<T> attr) {
+ if (lastUpd == null || (lastUpd.storedAttrBits & attr.bitmask()) == 0)
+ return attr.initialValue(); // OperationContext does not store
value for the specified attribute.
+
+ AttributeValueHolder<T> valHolder = findAttributeValue(attr);
+
+ assert valHolder != null;
+ assert valHolder.attr.equals(attr);
+
+ return valHolder.val;
+ }
+
+ /** Updates the current context with the specified attributes and their
corresponding values. */
+ private Scope applyAttributeUpdates(AttributeValueHolder<?>... attrVals) {
+ lastUpd = new Update(attrVals, lastUpd);
+
+ return lastUpd;
+ }
+
+ /** Undoes the latest updated. */
+ private void undo(Update upd) {
+ assert lastUpd == upd;
+
+ lastUpd = lastUpd.prev;
+ }
+
+ /** Iterates over the currently applied context updates and finds the
latest value associated with the specified attribute. */
+ private <T> AttributeValueHolder<T>
findAttributeValue(OperationContextAttribute<T> attr) {
+ for (Update upd = lastUpd; upd != null; upd = upd.prev) {
+ if (!upd.holdsValueFor(attr))
+ continue;
+
+ return upd.value(attr);
+ }
+
+ return null;
+ }
+
+ /** */
+ private OperationContextSnapshot createSnapshotInternal() {
+ // The sequence of updates defines the state of the OperationContext.
Each update is linked to the previous
+ // one and immutable. Therefore, to restore the context state
elsewhere, we only need to share a reference to
+ // the most recent update.
+ return lastUpd;
+ }
+
+ /** */
+ private Scope restoreSnapshotInternal(OperationContextSnapshot newSnp) {
+ OperationContextSnapshot prevSnp = createSnapshotInternal();
+
+ if (newSnp == prevSnp)
+ return NOOP_SCOPE;
+
+ changeState(prevSnp, newSnp);
+
+ return () -> changeState(newSnp, prevSnp);
+ }
+
+ /** */
+ private void changeState(OperationContextSnapshot expState,
OperationContextSnapshot newState) {
+ assert lastUpd == expState;
+
+ lastUpd = (Update)newState;
+ }
+
+ /** Represents Update applied to the {@link OperationContext}. */
+ private class Update implements Scope, OperationContextSnapshot {
+ /** Updated attributes and their corresponding values. */
+ private final AttributeValueHolder<?>[] attrVals;
+
+ /**
+ * Bits representing all attributes which values were changed by this
update.
+ *
+ * @see OperationContextAttribute#bitmask()
+ */
+ private final int updAttrBits;
+
+ /**
+ * Bits representing all attributes stored in the current {@link
OperationContext} after this Update and all
+ * preceding are applied. We need this for two purposes:
+ * <ul>
+ * <li>fast check whether any of the currently applied {@link
OperationContext} Updates store value for the
+ * particular attribute</li>
+ * <li>do not recalculate state of all attributes when update is
undone</li>
+ * </ul>
+ *
+ * @see OperationContextAttribute#bitmask()
+ */
+ private final int storedAttrBits;
+
+ /** Link to the previous update. */
+ private final Update prev;
+
+ /** */
+ Update(AttributeValueHolder<?>[] attrVals, Update prev) {
+ this.attrVals = attrVals;
+ this.prev = prev;
+
+ updAttrBits = mergeUpdatedAttributeBits(attrVals);
+ storedAttrBits = prev == null ? updAttrBits : prev.storedAttrBits
| updAttrBits;
+ }
+
+ /** @return Whether current update contains value for the specified
attribute. */
+ boolean holdsValueFor(OperationContextAttribute<?> attr) {
+ return (updAttrBits & attr.bitmask()) != 0;
+ }
+
+ /**
+ * @return Attribute value that was set by the current update for the
specified attribute. {@code null} if
+ * specified Attribute was not changed by this update.
+ */
+ @Nullable <T> AttributeValueHolder<T>
value(OperationContextAttribute<T> attr) {
+ // We iterate in reverse order to correctly handle the case when
the value for the same attribute is
+ // specified multiple times.
+ for (int i = attrVals.length - 1; i >= 0; i--) {
+ AttributeValueHolder<?> valHolder = attrVals[i];
+
+ if (valHolder.attr.equals(attr))
+ return ((AttributeValueHolder<T>)valHolder);
+ }
+
+ return null;
+ }
+
+ /** */
+ private int mergeUpdatedAttributeBits(AttributeValueHolder<?>[]
attrVals) {
+ int res = 0;
+
+ for (AttributeValueHolder<?> valHolder : attrVals)
+ res |= valHolder.attr.bitmask();
+
+ return res;
+ }
+
+ /** */
+ @Override public void close() {
+ undo(this);
+ }
+ }
+
+ /** Immutable container that stores an attribute and its corresponding
value. */
+ private static class AttributeValueHolder<T> {
+ /** */
+ private final OperationContextAttribute<T> attr;
+
+ /** */
+ private final T val;
+
+ /** */
+ AttributeValueHolder(OperationContextAttribute<T> attr, T val) {
+ this.attr = attr;
+ this.val = val;
+ }
+ }
+
+ /** Allows to change multiple attribute values in a single update
operation and skip updates that changes nothing. */
+ private static class ContextUpdater {
+ /** */
+ private static final int INIT_UPDATES_CAPACITY = 3;
+
+ /** */
+ private final OperationContext ctx;
+
+ /** */
+ private List<AttributeValueHolder<?>> updates;
+
+ /** */
+ private ContextUpdater(OperationContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** */
+ <T> ContextUpdater set(OperationContextAttribute<T> attr, T val) {
+ if (ctx.getInternal(attr) == val)
+ return this;
+
+ if (updates == null)
+ updates = new ArrayList<>(INIT_UPDATES_CAPACITY);
+
+ updates.add(new AttributeValueHolder<>(attr, val));
+
+ return this;
+ }
+
+ /** */
+ Scope apply() {
+ if (F.isEmpty(updates))
+ return NOOP_SCOPE;
+
+ AttributeValueHolder<?>[] sealedUpdates = new
AttributeValueHolder[updates.size()];
+
+ updates.toArray(sealedUpdates);
+
+ return ctx.applyAttributeUpdates(sealedUpdates);
+ }
+
+ /** */
+ static ContextUpdater create() {
+ return new ContextUpdater(INSTANCE.get());
+ }
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java
new file mode 100644
index 00000000000..499d241d9cc
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a key to access and modify {@link OperationContext} records.
+ *
+ * @see OperationContext
+ * @see OperationContext#get(OperationContextAttribute)
+ * @see OperationContext#set(OperationContextAttribute, Object)
+ */
+public class OperationContextAttribute<T> {
+ /** */
+ static final AtomicInteger ID_GEN = new AtomicInteger();
+
+ /** */
+ static final int MAX_ATTR_CNT = Integer.SIZE;
+
+ /** */
+ private final int bitmask;
+
+ /** */
+ @Nullable private final T initVal;
+
+ /** */
+ private OperationContextAttribute(int bitmask, @Nullable T initVal) {
+ this.bitmask = bitmask;
+ this.initVal = initVal;
+ }
+
+ /**
+ * Initial Value associated with the current Attribute. Initial value will
be automatically returned by the
+ * {@link OperationContext#get} method if Attribute's value has not been
previously set.
+ * @see OperationContext#get(OperationContextAttribute)
+ */
+ @Nullable public T initialValue() {
+ return initVal;
+ }
+
+ /**
+ * Unique attribute bitmask calculated by shifting one from 0 to {@link
Integer#SIZE}. It provides an ability to
+ * use {@link OperationContext} Attribute with bit fields.
+ */
+ int bitmask() {
+ return bitmask;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object other) {
+ if (this == other)
+ return true;
+
+ if (!(other instanceof OperationContextAttribute))
+ return false;
+
+ return bitmask == ((OperationContextAttribute<?>)other).bitmask;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return bitmask;
+ }
+
+ /**
+ * Creates new instance of the {@link OperationContext} Attribute with
Initial Value set to {@code null}.
+ * <p>
+ * Note, that the maximum number of attribute instances that can be
created is currently limited to
+ * {@link #MAX_ATTR_CNT} for implementation reasons.
+ * </p>
+ */
+ public static <T> OperationContextAttribute<T> newInstance() {
+ return newInstance(null);
+ }
+
+ /**
+ * Creates new instance of the {@link OperationContext} Attribute with the
specified Initial Value. The Initial
+ * Value is returned by {@link OperationContext#get} method if the
Attribute's value is not explicitly set in the
+ * {@link OperationContext}.
+ * <p>
+ * Note, that the maximum number of attribute instances that can be
created is currently limited to
+ * {@link #MAX_ATTR_CNT} for implementation reasons.
+ * </p>
+ */
+ public static <T> OperationContextAttribute<T> newInstance(T initVal) {
+ int id = ID_GEN.getAndIncrement();
+
+ assert id < MAX_ATTR_CNT : "Exceeded maximum supported number of
created Attributes instances [maxCnt=" + MAX_ATTR_CNT + ']';
+
+ return new OperationContextAttribute<>(1 << id, initVal);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java
similarity index 51%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java
rename to
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java
index 7f65d942948..d863aa3f11a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java
@@ -15,32 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.security;
+package org.apache.ignite.internal.thread.context;
+
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
/**
+ * Represents snapshot of all Attributes and their corresponding values for a
particular {@link OperationContext}
+ * instance. Its main purpose to save {@link OperationContext} state and
restore it later, possible for
+ * {@link OperationContext} bound to another thread.
*
+ * @see OperationContext
+ * @see OperationContext#createSnapshot()
+ * @see OperationContext#restoreSnapshot(OperationContextSnapshot)
+ * @see OperationContextAwareCallable
+ * @see OperationContextAwareRunnable
*/
-public class OperationSecurityContext implements AutoCloseable {
- /** Ignite Security. */
- private final IgniteSecurity proc;
-
- /** Security context. */
- private final SecurityContext secCtx;
-
- /**
- * @param proc Ignite Security.
- * @param secCtx Security context.
- */
- OperationSecurityContext(IgniteSecurity proc, SecurityContext secCtx) {
- this.proc = proc;
- this.secCtx = secCtx;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- if (secCtx == null)
- ((IgniteSecurityProcessor)proc).restoreDefaultContext();
- else
- proc.withContext(secCtx);
- }
+public interface OperationContextSnapshot {
+ // No-op.
}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java
new file mode 100644
index 00000000000..a48cf2e57ae
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+/**
+ * Represents the Scope of {@link OperationContext} attributes update.
Explicitly calling {@link #close()} method undoes
+ * the applied changes and restores previous attribute values, if any. Note
that every Scope relating to a specific
+ * {@link OperationContext} update must be closed to free up thread-bound
resources and avoid memory leaks, so it is
+ * highly encouraged to use a try-with-resource block with Scope instances.
+ * <p>
+ * Scope is result of the following {@link OperationContext} update operations:
+ * <ul>
+ * <li>{@link OperationContext#set(OperationContextAttribute, Object)} -
creates a new or update an existing mapping
+ * between specified {@link OperationContextAttribute} and its value</li>
+ * <li>{@link OperationContext#restoreSnapshot(OperationContextSnapshot)}
- updates {@link OperationContextAttribute}
+ * values to match the values stored in {@link
OperationContextSnapshot}</li>
+ * </ul>
+ * </p>
+ *
+ * @see OperationContext#set(OperationContextAttribute, Object)
+ * @see OperationContext#restoreSnapshot(OperationContextSnapshot)
+ */
+public interface Scope extends AutoCloseable {
+ /** Scope instance that does nothing when closed. */
+ Scope NOOP_SCOPE = () -> {};
+
+ /** Closes the scope. This operation cannot fail. */
+ @Override void close();
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java
new file mode 100644
index 00000000000..22ef5141739
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.concurrent;
+
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareExecutor implements Executor {
+ /** */
+ private final Executor delegate;
+
+ /** */
+ private OperationContextAwareExecutor(Executor delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(@NotNull Runnable command) {
+ delegate.execute(OperationContextAwareRunnable.wrap(command));
+ }
+
+ /**
+ * Creates executor wrapper that automatically captures {@link
OperationContextSnapshot} of {@link OperationContext}
+ * for the thread that invokes task execution. Captured {@link
OperationContextSnapshot} will be restored before
+ * task execution, potentially in another thread.
+ */
+ public static Executor wrap(Executor delegate) {
+ return delegate == null ? null : new
OperationContextAwareExecutor(delegate);
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java
new file mode 100644
index 00000000000..909a4d88dcb
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+
+/** */
+public class OperationContextAwareCallable<T> extends
OperationContextAwareWrapper<Callable<T>> implements Callable<T> {
+ /** */
+ private OperationContextAwareCallable(Callable<T> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T call() throws Exception {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ return delegate.call();
+ }
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Callable} along with
the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is
called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link
Callable} execution, potentially in another
+ * thread.
+ */
+ public static <T> Callable<T> wrap(Callable<T> delegate) {
+ return wrap(delegate, OperationContextAwareCallable::new);
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Callable} along with
the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is
called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link
Callable} execution, potentially in another
+ * thread.
+ * If {@link OperationContext} holds no data when this method is called,
it does nothing and returns original
+ * {@link Callable}.
+ */
+ public static <T> Callable<T> wrapIfContextNotEmpty(Callable<T> delegate) {
+ return wrap(delegate, OperationContextAwareCallable::new, true);
+ }
+
+ /** The same as {@link #wrap(Callable)} but wraps each specified {@link
Callable}. */
+ public static <T> Collection<Callable<T>> wrap(Collection<? extends
Callable<T>> tasks) {
+ return tasks == null ? null :
tasks.stream().map(OperationContextAwareCallable::wrap).collect(Collectors.toList());
+ }
+
+ /** The same as {@link #wrapIfContextNotEmpty(Callable)} but wraps each
specified {@link Callable}. */
+ public static <T> Collection<Callable<T>>
wrapIfContextNotEmpty(Collection<? extends Callable<T>> tasks) {
+ return tasks == null ? null :
tasks.stream().map(OperationContextAwareCallable::wrapIfContextNotEmpty).collect(Collectors.toList());
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java
new file mode 100644
index 00000000000..04f11ba7107
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+
+/** */
+public class OperationContextAwareRunnable extends
OperationContextAwareWrapper<Runnable> implements Runnable {
+ /** */
+ public OperationContextAwareRunnable(Runnable delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ delegate.run();
+ }
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Runnable} along with
the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is
called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link
Runnable} execution, potentially in another
+ * thread.
+ */
+ public static Runnable wrap(Runnable delegate) {
+ return wrap(delegate, OperationContextAwareRunnable::new);
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Runnable} along with
the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is
called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link
Runnable} execution, potentially in another
+ * thread.
+ * If {@link OperationContext} holds no data when this method is called,
it does nothing and returns original
+ * {@link Runnable}.
+ */
+ public static Runnable wrapIfContextNotEmpty(Runnable delegate) {
+ return wrap(delegate, OperationContextAwareRunnable::new, true);
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
new file mode 100644
index 00000000000..a741d3b6d3c
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.IgniteInternalWrapper;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+
+/** */
+abstract class OperationContextAwareWrapper<T> implements
IgniteInternalWrapper<T> {
+ /** */
+ protected final T delegate;
+
+ /** */
+ protected final OperationContextSnapshot snapshot;
+
+ /** */
+ @Override public T delegate() {
+ return delegate;
+ }
+
+ /** */
+ protected OperationContextAwareWrapper(T delegate,
OperationContextSnapshot snapshot) {
+ this.delegate = delegate;
+ this.snapshot = snapshot;
+ }
+
+ /** */
+ protected static <T> T wrap(T delegate, BiFunction<T,
OperationContextSnapshot, T> wrapper) {
+ return wrap(delegate, wrapper, false);
+ }
+
+ /** */
+ protected static <T> T wrap(T delegate, BiFunction<T,
OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
+ if (delegate == null || delegate instanceof
OperationContextAwareWrapper)
+ return delegate;
+
+ OperationContextSnapshot snapshot = OperationContext.createSnapshot();
+
+ if (ignoreEmptyContext && snapshot == null)
+ return delegate;
+
+ return wrapper.apply(delegate, snapshot);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
index 2092e1256ad..9456ecfcd0a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
@@ -27,8 +27,8 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteScheduler;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -211,7 +211,7 @@ public class IgniteSchedulerImpl implements
IgniteScheduler, Externalizable {
@Override public void run() {
assert runnable != null;
- try (OperationSecurityContext c =
ctx.security().withContext(secSubjId)) {
+ try (Scope ignored = ctx.security().withContext(secSubjId)) {
runnable.run();
}
}
@@ -220,7 +220,7 @@ public class IgniteSchedulerImpl implements
IgniteScheduler, Externalizable {
@Override public T call() throws Exception {
assert call != null;
- try (OperationSecurityContext c =
ctx.security().withContext(secSubjId)) {
+ try (Scope ignored = ctx.security().withContext(secSubjId)) {
return call.call();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
index 5ae1ee1f673..51f75b823ba 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
@@ -21,8 +21,8 @@ import java.security.AccessControlException;
import java.util.UUID;
import
org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.lang.IgniteBiPredicate;
/**
@@ -52,7 +52,7 @@ public class SecurityAwareBiPredicate<E1, E2> extends
AbstractSecurityAwareExter
@Override public boolean apply(E1 e1, E2 e2) {
IgniteSecurity security = ignite.context().security();
- try (OperationSecurityContext c = security.withContext(subjectId)) {
+ try (Scope ignored = security.withContext(subjectId)) {
IgniteSandbox sandbox = security.sandbox();
return sandbox.enabled() ? sandbox.execute(() ->
original.apply(e1, e2)) : original.apply(e1, e2);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
index 4c412db01ae..60723b00e8f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
@@ -21,8 +21,8 @@ import java.security.AccessControlException;
import java.util.UUID;
import
org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.lang.IgnitePredicate;
/**
@@ -52,7 +52,7 @@ public class SecurityAwarePredicate<E> extends
AbstractSecurityAwareExternalizab
@Override public boolean apply(E evt) {
IgniteSecurity security = ignite.context().security();
- try (OperationSecurityContext c = security.withContext(subjectId)) {
+ try (Scope ignored = security.withContext(subjectId)) {
IgniteSandbox sandbox = security.sandbox();
return sandbox.enabled() ? sandbox.execute(() ->
original.apply(evt)) : original.apply(evt);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
index 0a46c30d73b..8cbdf158458 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
@@ -40,7 +40,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteIrreparableConsistencyViolationException;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
@@ -273,7 +273,7 @@ public class ConsistencyRepairTask extends
AbstractConsistencyTask<ConsistencyRe
* @param keys Keys.
*/
private void repair(IgniteCache<Object, Object> cache, Set<Object>
keys) {
- try (OperationSecurityContext ignored =
ignite.context().security().withContext(ignite.localNode().id())) {
+ try (Scope ignored =
ignite.context().security().withContext(ignite.localNode().id())) {
cache.getAll(keys); // Repair.
}
catch (CacheException e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 633ebef235b..dddb30bb6ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -99,12 +99,12 @@ import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccess
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import
org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
@@ -1876,7 +1876,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
UUID newSecSubjId = secSubjId != null ? secSubjId : nodeId;
- try (OperationSecurityContext s =
ctx.security().withContext(newSecSubjId)) {
+ try (Scope ignored = ctx.security().withContext(newSecSubjId)) {
lsnr.onMessage(nodeId, msg, plc);
}
finally {
@@ -3644,7 +3644,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
if (msgBody != null) {
if (predLsnr != null) {
- try (OperationSecurityContext s =
ctx.security().withContext(initNodeId)) {
+ try (Scope ignored =
ctx.security().withContext(initNodeId)) {
if (!predLsnr.apply(nodeId, msgBody))
removeMessageListener(TOPIC_COMM_USER, this);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index f3b93fe947e..de9a08f18f3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -91,12 +91,12 @@ import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import
org.apache.ignite.internal.processors.cluster.IGridClusterStateProcessor;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.systemview.ClusterNodeViewWalker;
import org.apache.ignite.internal.systemview.NodeAttributeViewWalker;
import org.apache.ignite.internal.systemview.NodeMetricsViewWalker;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -938,7 +938,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
if (customMsg instanceof
SecurityAwareCustomMessageWrapper) {
UUID secSubjId =
((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId();
- try (OperationSecurityContext ignored =
ctx.security().withContext(secSubjId)) {
+ try (Scope ignored =
ctx.security().withContext(secSubjId)) {
super.run();
}
}
@@ -949,7 +949,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
notification.getNode()
);
- try (OperationSecurityContext ignored =
ctx.security().withContext(initiatorNodeSecCtx)) {
+ try (Scope ignored =
ctx.security().withContext(initiatorNodeSecCtx)) {
super.run();
}
}
@@ -3119,7 +3119,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
blockingSectionEnd();
}
- try (OperationSecurityContext ignored =
withRemoteSecurityContext(ctx, evt.secCtx)) {
+ try (Scope ignored = withRemoteSecurityContext(ctx, evt.secCtx)) {
int type = evt.type;
AffinityTopologyVersion topVer = evt.topVer;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e0fd3c3ff42..90a9972779d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -112,9 +112,9 @@ import
org.apache.ignite.internal.processors.performancestatistics.OperationType
import
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import
org.apache.ignite.internal.processors.platform.client.cache.ImmutableArrayMap;
import
org.apache.ignite.internal.processors.platform.client.cache.ImmutableArraySet;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.thread.context.Scope;
import
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -3914,7 +3914,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
ctx.operationContextPerCall(opCtx);
ctx.shared().txContextReset();
- try (OperationSecurityContext ignored =
ctx.kernalContext().security().withContext(secCtx)) {
+ try (Scope ignored =
ctx.kernalContext().security().withContext(secCtx)) {
opFut = op.op(tx0).chain(clo);
}
catch (Throwable e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index bbc2e78bbb4..5918023afff 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -108,11 +108,11 @@ import
org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import
org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.GridStringBuilder;
@@ -3065,7 +3065,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
if (task == null)
continue; // Main while loop.
- try (OperationSecurityContext c =
withRemoteSecurityContext(cctx.kernalContext(), task.securityContext())) {
+ try (Scope ignored =
withRemoteSecurityContext(cctx.kernalContext(), task.securityContext())) {
if (!isExchangeTask(task)) {
processCustomTask(task);
@@ -3191,7 +3191,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
break;
}
- catch (IgniteFutureTimeoutCheckedException
ignored) {
+ catch (IgniteFutureTimeoutCheckedException
ignoredEx) {
updateHeartbeat();
if (nextDumpTime <= U.currentTimeMillis())
{
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
index 96c6d8e8aff..6b420ad7f14 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
@@ -52,8 +52,8 @@ import org.apache.ignite.internal.cluster.DetachedClusterNode;
import
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -153,7 +153,7 @@ public class ValidationOnNodeJoinUtils {
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo :
nodeData.caches().values()) {
if (secCtx != null && cacheInfo.cacheType() == CacheType.USER)
{
- try (OperationSecurityContext s =
ctx.security().withContext(secCtx)) {
+ try (Scope ignored = ctx.security().withContext(secCtx)) {
GridCacheProcessor.authorizeCacheCreate(ctx.security(),
cacheInfo.cacheData().config());
}
catch (SecurityException ex) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
index 1f15faa34d8..91d53798b0e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
@@ -25,8 +25,8 @@ import javax.cache.event.CacheEntryListenerException;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import
org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
/**
* Security aware remote filter.
@@ -60,7 +60,7 @@ public class SecurityAwareFilter<K, V> extends
AbstractSecurityAwareExternalizab
IgniteSecurity security = ignite.context().security();
- try (OperationSecurityContext c = security.withContext(subjectId)) {
+ try (Scope ignored = security.withContext(subjectId)) {
IgniteSandbox sandbox = security.sandbox();
return sandbox.enabled() ? sandbox.execute(() ->
original.evaluate(evt)) : original.evaluate(evt);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
index 8e4e8585304..9e958a4ff5a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
@@ -22,8 +22,8 @@ import java.util.UUID;
import javax.cache.configuration.Factory;
import
org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.lang.IgniteClosure;
/**
@@ -58,7 +58,7 @@ public class SecurityAwareTransformerFactory<E, R> extends
@Override public R apply(E e) {
IgniteSecurity security = ignite.context().security();
- try (OperationSecurityContext c =
security.withContext(subjectId)) {
+ try (Scope ignored = security.withContext(subjectId)) {
IgniteSandbox sandbox = security.sandbox();
return sandbox.enabled() ? sandbox.execute(() ->
cl.apply(e)) : cl.apply(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 3f481dd8eff..90f32db551f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -50,12 +50,12 @@ import
org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import
org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -540,7 +540,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
SqlFieldsQuery.setThreadedQueryInitiatorId("task:" + ses.getTaskName()
+ ":" + getJobId());
- try (OperationSecurityContext ignored =
ctx.security().withContext(secCtx)) {
+ try (Scope ignored = ctx.security().withContext(secCtx)) {
if (partsReservation != null) {
try {
if (!partsReservation.reserve()) {
@@ -760,7 +760,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
status = CANCELLED;
U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> {
- try (OperationSecurityContext c =
ctx.security().withContext(secCtx)) {
+ try (Scope ignored = ctx.security().withContext(secCtx)) {
job0.cancel();
}
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index f56867a0eeb..3d6f6ab75c3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -39,7 +39,7 @@ import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
import org.apache.ignite.internal.processors.odbc.odbc.OdbcConnectionContext;
import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
@@ -230,7 +230,7 @@ public class ClientListenerNioListener extends
GridNioServerListenerAdapter<Clie
ClientListenerResponse resp;
- try (OperationSecurityContext ignored =
ctx.security().withContext(connCtx.securityContext())) {
+ try (Scope ignored =
ctx.security().withContext(connCtx.securityContext())) {
resp = hnd.handle(req);
}
@@ -569,7 +569,7 @@ public class ClientListenerNioListener extends
GridNioServerListenerAdapter<Clie
// When security is enabled, only an administrator can connect and
execute commands.
if (connCtx.securityContext() != null) {
- try (OperationSecurityContext ignored =
ctx.security().withContext(connCtx.securityContext())) {
+ try (Scope ignored =
ctx.security().withContext(connCtx.securityContext())) {
ctx.security().authorize(SecurityPermission.ADMIN_OPS);
}
catch (SecurityException e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 7c40e9d583c..79b31b9f072 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -43,12 +43,12 @@ import
org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import
org.apache.ignite.internal.processors.security.thread.SecurityAwareIoPool;
-import
org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedExecutor;
-import
org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedThreadPoolExecutor;
-import
org.apache.ignite.internal.processors.security.thread.SecurityAwareThreadPoolExecutor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.systemview.StripedExecutorTaskViewWalker;
+import org.apache.ignite.internal.thread.pool.OperationContextAwareIoPool;
+import
org.apache.ignite.internal.thread.pool.OperationContextAwareStripedExecutor;
+import
org.apache.ignite.internal.thread.pool.OperationContextAwareStripedThreadPoolExecutor;
+import
org.apache.ignite.internal.thread.pool.OperationContextAwareThreadPoolExecutor;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -280,7 +280,7 @@ public class PoolProcessor extends GridProcessorAdapter {
throw new IgniteException("Failed to register IO
executor pool because its ID as " +
"already used: " + id);
- extPools[id] = ctx.security().enabled() ? new
SecurityAwareIoPool(ctx.security(), ex) : ex;
+ extPools[id] = ctx.security().enabled() ?
OperationContextAwareIoPool.wrap(ex) : ex;
}
}
}
@@ -1216,8 +1216,7 @@ public class PoolProcessor extends GridProcessorAdapter {
long keepAliveTime
) {
return ctx.security().enabled()
- ? new SecurityAwareStripedThreadPoolExecutor(
- ctx.security(),
+ ? new OperationContextAwareStripedThreadPoolExecutor(
concurrentLvl,
igniteInstanceName,
threadNamePrefix,
@@ -1245,8 +1244,7 @@ public class PoolProcessor extends GridProcessorAdapter {
long failureDetectionTimeout
) {
return ctx.security().enabled()
- ? new SecurityAwareStripedExecutor(
- ctx.security(),
+ ? new OperationContextAwareStripedExecutor(
cnt,
igniteInstanceName,
poolName,
@@ -1270,8 +1268,7 @@ public class PoolProcessor extends GridProcessorAdapter {
UncaughtExceptionHandler eHnd
) {
return ctx.security().enabled()
- ? new SecurityAwareThreadPoolExecutor(
- ctx.security(),
+ ? new OperationContextAwareThreadPoolExecutor(
threadNamePrefix,
igniteInstanceName,
corePoolSize,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 0c521ceef9b..f26b3af040e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -68,8 +68,8 @@ import
org.apache.ignite.internal.processors.rest.request.GridRestNodeStateBefor
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestTaskRequest;
import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.C1;
@@ -290,7 +290,7 @@ public class GridRestProcessor extends GridProcessorAdapter
implements IgniteRes
if (secCtx0 == null || ses.isTokenExpired(sesTokTtl))
ses.secCtx = secCtx0 = authenticate(req, ses);
- try (OperationSecurityContext s =
ctx.security().withContext(secCtx0)) {
+ try (Scope ignored = ctx.security().withContext(secCtx0)) {
authorize(req);
return handleRequest0(req);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java
index bdcaa966de1..b673aaa383d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.plugin.security.AuthenticationContext;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.security.SecurityException;
@@ -42,30 +43,30 @@ import org.apache.ignite.plugin.security.SecuritySubject;
*/
public interface IgniteSecurity {
/**
- * Creates {@link OperationSecurityContext}. All calls of methods {@link
#authorize(String, SecurityPermission)} or {@link
+ * Creates {@link Scope}. All calls of methods {@link #authorize(String,
SecurityPermission)} or {@link
* #authorize(SecurityPermission)} will be processed into the context of
passed {@link SecurityContext} until
- * holder {@link OperationSecurityContext} will be closed.
+ * holder {@link Scope} will be closed.
*
* @param secCtx Security Context.
* @return Security context holder.
*/
- public OperationSecurityContext withContext(SecurityContext secCtx);
+ public Scope withContext(SecurityContext secCtx);
/**
- * Creates {@link OperationSecurityContext}. All calls of methods {@link
#authorize(String, SecurityPermission)} or {@link
+ * Creates {@link Scope}. All calls of methods {@link #authorize(String,
SecurityPermission)} or {@link
* #authorize(SecurityPermission)} will be processed into the context of
{@link SecurityContext} that is owned by
- * the node with given nodeId until holder {@link
OperationSecurityContext} will be closed.
+ * the node with given nodeId until holder {@link Scope} will be closed.
*
* @param nodeId Node id.
* @return Security context holder.
*/
- public OperationSecurityContext withContext(UUID nodeId);
+ public Scope withContext(UUID nodeId);
/** @return {@code True} if current thread executed in default security
context. */
public boolean isDefaultContext();
/**
- * @return SecurityContext of holder {@link OperationSecurityContext}.
+ * @return SecurityContext of holder {@link Scope}.
*/
public SecurityContext securityContext();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
index 39721c717f1..7b34ed75db2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
@@ -32,6 +32,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import
org.apache.ignite.internal.processors.security.sandbox.AccessControllerSandbox;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
import org.apache.ignite.internal.processors.security.sandbox.NoOpSandbox;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextAttribute;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
@@ -85,8 +88,8 @@ public class IgniteSecurityProcessor extends
IgniteSecurityAdapter {
return SANDBOXED_NODES_COUNTER.get() > 0;
}
- /** Current security context if differs from {@link #dfltSecCtx}. */
- private final ThreadLocal<SecurityContext> curSecCtx = new ThreadLocal<>();
+ /** Context attribute that holds Security Context. */
+ private static final OperationContextAttribute<SecurityContext> SEC_CTX =
OperationContextAttribute.newInstance();
/** Security processor. */
private final GridSecurityProcessor secPrc;
@@ -106,13 +109,6 @@ public class IgniteSecurityProcessor extends
IgniteSecurityAdapter {
/** Default security context. */
private volatile SecurityContext dfltSecCtx;
- /** Default operation security context for the case when current and new
contexts are default. */
- private final OperationSecurityContext dfltOpCtx = new
OperationSecurityContext(this, null) {
- @Override public void close() {
- // No-op.
- }
- };
-
/**
* @param ctx Grid kernal context.
* @param secPrc Security processor.
@@ -129,25 +125,12 @@ public class IgniteSecurityProcessor extends
IgniteSecurityAdapter {
}
/** {@inheritDoc} */
- @Override public OperationSecurityContext withContext(SecurityContext
secCtx) {
- assert secCtx != null;
-
- SecurityContext dflt = dfltSecCtx;
- SecurityContext cur = curSecCtx.get();
-
- boolean isNewCtxDflt = secCtx == dflt;
- boolean isCurCtxDflt = cur == null;
-
- if (isCurCtxDflt && isNewCtxDflt)
- return dfltOpCtx;
-
- curSecCtx.set(isNewCtxDflt ? null : secCtx);
-
- return new OperationSecurityContext(this, isCurCtxDflt ? null : cur);
+ @Override public Scope withContext(SecurityContext secCtx) {
+ return OperationContext.set(SEC_CTX, secCtx == dfltSecCtx ? null :
secCtx);
}
/** {@inheritDoc} */
- @Override public OperationSecurityContext withContext(UUID subjId) {
+ @Override public Scope withContext(UUID subjId) {
try {
SecurityContext res = secPrc.securityContext(subjId);
@@ -187,19 +170,14 @@ public class IgniteSecurityProcessor extends
IgniteSecurityAdapter {
);
}
- /** Restores local node context for the current thread. */
- void restoreDefaultContext() {
- curSecCtx.set(null);
- }
-
/** {@inheritDoc} */
@Override public boolean isDefaultContext() {
- return curSecCtx.get() == null;
+ return OperationContext.get(SEC_CTX) == null;
}
/** {@inheritDoc} */
@Override public SecurityContext securityContext() {
- SecurityContext res = curSecCtx.get();
+ SecurityContext res = OperationContext.get(SEC_CTX);
return res == null ? dfltSecCtx : res;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java
index 0dc6151cf6c..4ad28405ce3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
import org.apache.ignite.internal.processors.security.sandbox.NoOpSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.plugin.security.AuthenticationContext;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.security.SecurityException;
@@ -44,13 +45,6 @@ public class NoOpIgniteSecurityProcessor extends
IgniteSecurityAdapter {
/** Error message that occurs when trying to perform security operations
if security disabled. */
public static final String SECURITY_DISABLED_ERROR_MSG = "Operation cannot
be performed: Ignite security disabled.";
- /** No operation security context. */
- private final OperationSecurityContext opSecCtx = new
OperationSecurityContext(this, null) {
- @Override public void close() {
- // No-op.
- }
- };
-
/** Instance of IgniteSandbox. */
private final IgniteSandbox sandbox = new NoOpSandbox();
@@ -62,13 +56,13 @@ public class NoOpIgniteSecurityProcessor extends
IgniteSecurityAdapter {
}
/** {@inheritDoc} */
- @Override public OperationSecurityContext withContext(SecurityContext
secCtx) {
- return opSecCtx;
+ @Override public Scope withContext(SecurityContext secCtx) {
+ return Scope.NOOP_SCOPE;
}
/** {@inheritDoc} */
- @Override public OperationSecurityContext withContext(UUID nodeId) {
- return opSecCtx;
+ @Override public Scope withContext(UUID nodeId) {
+ return Scope.NOOP_SCOPE;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
index 24dfa4ca012..0911df2a584 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.security.sandbox.IgniteDomainCombiner;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -201,9 +202,9 @@ public class SecurityUtils {
* context change is needed.
* Note that this method is safe to use only when it is known to be called
in the security context of the local node
* (e.g. in system workers).
- * @return {@link OperationSecurityContext} instance if new security
context is set, otherwise {@code null}.
+ * @return {@link Scope} instance if new security context is set,
otherwise {@code null}.
*/
- public static OperationSecurityContext
withRemoteSecurityContext(GridKernalContext ctx, SecurityContext secCtx) {
+ public static Scope withRemoteSecurityContext(GridKernalContext ctx,
SecurityContext secCtx) {
if (secCtx == null)
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareCallable.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareCallable.java
deleted file mode 100644
index 43845f198f8..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareCallable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.security.thread;
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
-import org.apache.ignite.internal.processors.security.SecurityContext;
-
-/**
- * Represents a {@link Callable} wrapper that executes the original {@link
Callable} with the security context
- * current at the time the wrapper was created.
- */
-class SecurityAwareCallable<T> implements Callable<T> {
- /** Original callable. */
- private final Callable<T> delegate;
-
- /** */
- private final IgniteSecurity security;
-
- /** */
- private final SecurityContext secCtx;
-
- /** */
- private SecurityAwareCallable(IgniteSecurity security, Callable<T>
delegate) {
- assert security.enabled();
- assert delegate != null;
-
- this.delegate = delegate;
- this.security = security;
- secCtx = security.securityContext();
- }
-
- /** {@inheritDoc} */
- @Override public T call() throws Exception {
- try (OperationSecurityContext ignored = security.withContext(secCtx)) {
- return delegate.call();
- }
- }
-
- /** */
- static <A> Callable<A> of(IgniteSecurity sec, Callable<A> delegate) {
- if (delegate == null || sec.isDefaultContext())
- return delegate;
-
- return new SecurityAwareCallable<>(sec, delegate);
- }
-
- /** */
- static <A> Collection<? extends Callable<A>> of(
- IgniteSecurity sec,
- Collection<? extends Callable<A>> tasks
- ) {
- if (tasks == null || sec.isDefaultContext())
- return tasks;
-
- return tasks.stream().map(t -> t == null ? null : new
SecurityAwareCallable<>(sec, t)).collect(Collectors.toList());
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java
deleted file mode 100644
index 4ea8765d147..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.security.thread;
-
-import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
-import org.apache.ignite.internal.processors.security.SecurityContext;
-
-/**
- * Represents a {@link Runnable} wrapper that executes the original {@link
Runnable} with the security context
- * current at the time the wrapper was created.
- */
-class SecurityAwareRunnable implements Runnable {
- /** */
- private final Runnable delegate;
-
- /** */
- private final IgniteSecurity security;
-
- /** */
- private final SecurityContext secCtx;
-
- /** */
- private SecurityAwareRunnable(IgniteSecurity security, Runnable delegate) {
- assert security.enabled();
- assert delegate != null;
-
- this.delegate = delegate;
- this.security = security;
- secCtx = security.securityContext();
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- try (OperationSecurityContext ignored = security.withContext(secCtx)) {
- delegate.run();
- }
- }
-
- /** */
- static Runnable of(IgniteSecurity security, Runnable delegate) {
- if (delegate == null || security.isDefaultContext())
- return delegate;
-
- return new SecurityAwareRunnable(security, delegate);
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index aed2ac33acb..da4189d8f2e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -70,9 +70,9 @@ import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.platform.services.PlatformService;
import
org.apache.ignite.internal.processors.platform.services.PlatformServiceConfiguration;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.systemview.ServiceViewWalker;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -2080,7 +2080,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
return err;
}
- try (OperationSecurityContext ignored =
ctx.security().withContext(secCtx)) {
+ try (Scope ignored = ctx.security().withContext(secCtx)) {
for (ServiceInfo desc : svcs) {
SecurityException err = checkPermissions(desc.name(),
SERVICE_DEPLOY);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareIoPool.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareIoPool.java
similarity index 58%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareIoPool.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareIoPool.java
index c9e2359c1ac..6768d4a21de 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareIoPool.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareIoPool.java
@@ -15,18 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.security.thread;
+package org.apache.ignite.internal.thread.pool;
import java.util.concurrent.Executor;
-import org.apache.ignite.internal.processors.security.IgniteSecurity;
+import
org.apache.ignite.internal.thread.context.concurrent.OperationContextAwareExecutor;
import org.apache.ignite.plugin.extensions.communication.IoPool;
-import org.jetbrains.annotations.NotNull;
-
-/** Wrapper of {@link IoPool} that executes tasks in security context that was
actual when task was added to pool queue. */
-public class SecurityAwareIoPool implements IoPool {
- /** */
- private final IgniteSecurity security;
+/** */
+public class OperationContextAwareIoPool implements IoPool {
/** */
private final IoPool delegate;
@@ -34,20 +30,9 @@ public class SecurityAwareIoPool implements IoPool {
private final Executor executor;
/** */
- public SecurityAwareIoPool(IgniteSecurity security, IoPool delegate) {
- assert security.enabled();
- assert delegate != null;
-
- this.security = security;
+ private OperationContextAwareIoPool(IoPool delegate) {
this.delegate = delegate;
-
- final Executor delegateExecutor = delegate.executor();
-
- executor = delegateExecutor == null ? null : new Executor() {
- @Override public void execute(@NotNull Runnable cmd) {
-
delegateExecutor.execute(SecurityAwareRunnable.of(SecurityAwareIoPool.this.security,
cmd));
- }
- };
+ this.executor =
OperationContextAwareExecutor.wrap(delegate.executor());
}
/** {@inheritDoc} */
@@ -59,4 +44,9 @@ public class SecurityAwareIoPool implements IoPool {
@Override public Executor executor() {
return executor;
}
+
+ /** */
+ public static IoPool wrap(IoPool pool) {
+ return pool == null ? null : new OperationContextAwareIoPool(pool);
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedExecutor.java
similarity index 59%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedExecutor.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedExecutor.java
index f2239716cd8..268776a4d63 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedExecutor.java
@@ -15,42 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.security.thread;
+package org.apache.ignite.internal.thread.pool;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.security.IgniteSecurity;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.NotNull;
-/**
- * Extends {@link StripedExecutor} with the ability to execute tasks in
security context that was actual when task was
- * added to executor's queue.
- */
-public class SecurityAwareStripedExecutor extends StripedExecutor {
- /** */
- private final IgniteSecurity security;
-
- /** */
- public SecurityAwareStripedExecutor(
- IgniteSecurity security,
- int cnt,
- String igniteInstanceName,
- String poolName,
- IgniteLogger log,
- IgniteInClosure<Throwable> errHnd,
- GridWorkerListener gridWorkerLsnr,
- long failureDetectionTimeout
- ) {
- super(cnt, igniteInstanceName, poolName, log, errHnd, gridWorkerLsnr,
failureDetectionTimeout);
-
- this.security = security;
- }
-
+/** */
+public class OperationContextAwareStripedExecutor extends StripedExecutor {
/** */
- public SecurityAwareStripedExecutor(
- IgniteSecurity security,
+ public OperationContextAwareStripedExecutor(
int cnt,
String igniteInstanceName,
String poolName,
@@ -61,17 +38,15 @@ public class SecurityAwareStripedExecutor extends
StripedExecutor {
long failureDetectionTimeout
) {
super(cnt, igniteInstanceName, poolName, log, errHnd, stealTasks,
gridWorkerLsnr, failureDetectionTimeout);
-
- this.security = security;
}
/** {@inheritDoc} */
- @Override public void execute(int idx, Runnable cmd) {
- super.execute(idx, SecurityAwareRunnable.of(security, cmd));
+ @Override public void execute(@NotNull Runnable cmd) {
+
super.execute(OperationContextAwareRunnable.wrapIfContextNotEmpty(cmd));
}
/** {@inheritDoc} */
- @Override public void execute(@NotNull Runnable cmd) {
- super.execute(SecurityAwareRunnable.of(security, cmd));
+ @Override public void execute(int idx, Runnable cmd) {
+ super.execute(idx,
OperationContextAwareRunnable.wrapIfContextNotEmpty(cmd));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedThreadPoolExecutor.java
similarity index 59%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedThreadPoolExecutor.java
index ff80f2972a8..5b70bfcc21f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedThreadPoolExecutor.java
@@ -15,35 +15,27 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.security.thread;
+package org.apache.ignite.internal.thread.pool;
-import org.apache.ignite.internal.processors.security.IgniteSecurity;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
-/**
- * Extends {@link IgniteStripedThreadPoolExecutor} with the ability to execute
tasks in security context that was actual
- * when task was added to executor's queue.
- */
-public class SecurityAwareStripedThreadPoolExecutor extends
IgniteStripedThreadPoolExecutor {
- /** */
- private final IgniteSecurity security;
-
+/** */
+public class OperationContextAwareStripedThreadPoolExecutor extends
IgniteStripedThreadPoolExecutor {
/** */
- public SecurityAwareStripedThreadPoolExecutor(
- IgniteSecurity security,
- int concurrentLvl,
- String igniteInstanceName,
+ public OperationContextAwareStripedThreadPoolExecutor(
+ int concurrentLvl,
+ String igniteInstanceName,
String threadNamePrefix,
- Thread.UncaughtExceptionHandler eHnd,
- boolean allowCoreThreadTimeOut,
+ Thread.UncaughtExceptionHandler eHnd,
+ boolean allowCoreThreadTimeOut,
long keepAliveTime
) {
super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd,
allowCoreThreadTimeOut, keepAliveTime);
- this.security = security;
}
/** {@inheritDoc} */
@Override public void execute(Runnable task, int idx) {
- super.execute(SecurityAwareRunnable.of(security, task), idx);
+
super.execute(OperationContextAwareRunnable.wrapIfContextNotEmpty(task), idx);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareThreadPoolExecutor.java
similarity index 65%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareThreadPoolExecutor.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareThreadPoolExecutor.java
index 16c3216ab99..72888dd78a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareThreadPoolExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareThreadPoolExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.security.thread;
+package org.apache.ignite.internal.thread.pool;
import java.util.Collection;
import java.util.List;
@@ -23,24 +23,17 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.ignite.internal.processors.security.IgniteSecurity;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
-/**
- * Extends {@link ThreadPoolExecutor} with the ability to execute tasks in
security context that was actual when task was
- * added to executor's queue.
- */
-public class SecurityAwareThreadPoolExecutor extends IgniteThreadPoolExecutor {
- /** */
- private final IgniteSecurity security;
-
+/** */
+public class OperationContextAwareThreadPoolExecutor extends
IgniteThreadPoolExecutor {
/** */
- public SecurityAwareThreadPoolExecutor(
- IgniteSecurity security,
+ public OperationContextAwareThreadPoolExecutor(
String threadNamePrefix,
String igniteInstanceName,
int corePoolSize,
@@ -51,51 +44,51 @@ public class SecurityAwareThreadPoolExecutor extends
IgniteThreadPoolExecutor {
Thread.UncaughtExceptionHandler eHnd
) {
super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize,
keepAliveTime, workQ, plc, eHnd);
-
- this.security = security;
}
/** {@inheritDoc} */
@NotNull @Override public <T> Future<T> submit(@NotNull Callable<T> task) {
- return super.submit(SecurityAwareCallable.of(security, task));
+ return
super.submit(OperationContextAwareCallable.wrapIfContextNotEmpty(task));
}
/** {@inheritDoc} */
@NotNull @Override public <T> Future<T> submit(@NotNull Runnable task, T
res) {
- return super.submit(SecurityAwareRunnable.of(security, task), res);
+ return
super.submit(OperationContextAwareRunnable.wrapIfContextNotEmpty(task), res);
}
/** {@inheritDoc} */
@NotNull @Override public Future<?> submit(@NotNull Runnable task) {
- return super.submit(SecurityAwareRunnable.of(security, task));
+ return
super.submit(OperationContextAwareRunnable.wrapIfContextNotEmpty(task));
}
/** {@inheritDoc} */
- @NotNull @Override public <T> List<Future<T>> invokeAll(
- @NotNull Collection<? extends Callable<T>> tasks) throws
InterruptedException {
- return super.invokeAll(SecurityAwareCallable.of(security, tasks));
+ @NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull
Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return
super.invokeAll(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks));
}
/** {@inheritDoc} */
- @NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull
Collection<? extends Callable<T>> tasks,
- long timeout, @NotNull TimeUnit unit) throws InterruptedException {
- return super.invokeAll(SecurityAwareCallable.of(security, tasks),
timeout, unit);
+ @NotNull @Override public <T> List<Future<T>> invokeAll(
+ @NotNull Collection<? extends Callable<T>> tasks,
+ long timeout,
+ @NotNull TimeUnit unit
+ ) throws InterruptedException {
+ return
super.invokeAll(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks),
timeout, unit);
}
/** {@inheritDoc} */
@NotNull @Override public <T> T invokeAny(@NotNull Collection<? extends
Callable<T>> tasks)
throws InterruptedException, ExecutionException {
- return super.invokeAny(SecurityAwareCallable.of(security, tasks));
+ return
super.invokeAny(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks));
}
/** {@inheritDoc} */
@Override public <T> T invokeAny(@NotNull Collection<? extends
Callable<T>> tasks,
long timeout, @NotNull TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
- return super.invokeAny(SecurityAwareCallable.of(security, tasks),
timeout, unit);
+ return
super.invokeAny(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks),
timeout, unit);
}
/** {@inheritDoc} */
@Override public void execute(@NotNull Runnable cmd) {
- super.execute(SecurityAwareRunnable.of(security, cmd));
+
super.execute(OperationContextAwareRunnable.wrapIfContextNotEmpty(cmd));
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
index 9976502b213..06fa3cc2e7d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
@@ -29,8 +29,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -190,7 +190,7 @@ public class AuthenticationProcessorSelfTest extends
GridCommonAbstractTest {
for (int i = 0; i < NODES_COUNT; ++i) {
final int nodeIdx = i;
- try (OperationSecurityContext ignored =
grid(nodeIdx).context().security().withContext(secCtx)) {
+ try (Scope ignored =
grid(nodeIdx).context().security().withContext(secCtx)) {
GridTestUtils.assertThrows(log, () -> {
grid(nodeIdx).context().security().createUser("test1",
"test1".toCharArray());
@@ -462,7 +462,7 @@ public class AuthenticationProcessorSelfTest extends
GridCommonAbstractTest {
for (int i = 1; i < NODES_COUNT; i++) {
IgniteSecurity security = ignite(i).context().security();
- try (OperationSecurityContext ignored =
security.withContext(subj.id())) {
+ try (Scope ignored = security.withContext(subj.id())) {
SecuritySubject rmtSubj = security.securityContext().subject();
assertEquals(subj.id(), rmtSubj.id());
@@ -545,7 +545,7 @@ public class AuthenticationProcessorSelfTest extends
GridCommonAbstractTest {
assertNotNull(secCtx);
- try (OperationSecurityContext ignored =
ignite.context().security().withContext(secCtx)) {
+ try (Scope ignored =
ignite.context().security().withContext(secCtx)) {
action.accept(ignite.context().security());
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
new file mode 100644
index 00000000000..788321c9eff
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import
org.apache.ignite.internal.thread.pool.OperationContextAwareStripedExecutor;
+import
org.apache.ignite.internal.thread.pool.OperationContextAwareStripedThreadPoolExecutor;
+import
org.apache.ignite.internal.thread.pool.OperationContextAwareThreadPoolExecutor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/** */
+public class OperationContextAttributesTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DFLT_STR_VAL = "default";
+
+ /** */
+ private static final int DFLT_INT_VAL = -1;
+
+ /** */
+ private static final OperationContextAttribute<String> STR_ATTR =
OperationContextAttribute.newInstance(DFLT_STR_VAL);
+
+ /** */
+ private static final OperationContextAttribute<Integer> INT_ATTR =
OperationContextAttribute.newInstance(DFLT_INT_VAL);
+
+ /** */
+ private ExecutorService poolToShutdownAfterTest;
+
+ /** */
+ private int beforeTestReservedAttrIds;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ AttributeValueChecker.CHECKS.clear();
+
+ beforeTestReservedAttrIds = OperationContextAttribute.ID_GEN.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ if (poolToShutdownAfterTest != null)
+ poolToShutdownAfterTest.shutdownNow();
+
+ // Releases attribute IDs reserved during the test.
+ OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds);
+ }
+
+ /** */
+ @Test
+ public void testNotAttachedAttribute() {
+ // No opened scope.
+ assertEquals(DFLT_STR_VAL, OperationContext.get(STR_ATTR));
+
+ // Scope opened but testing attribute is not set.
+ try (Scope ignored = OperationContext.set(INT_ATTR, 0)) {
+ assertEquals(DFLT_STR_VAL, OperationContext.get(STR_ATTR));
+ }
+ }
+
+ /** */
+ @Test
+ public void testAttachedAttribute() {
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test")) {
+ assertEquals("test", OperationContext.get(STR_ATTR));
+ }
+ }
+
+ /** */
+ @Test
+ public void testAttributeValueSearchUpScopeStack() {
+ try (Scope ignored1 = OperationContext.set(STR_ATTR, "test1")) {
+ try (Scope ignored2 = OperationContext.set(INT_ATTR, 2)) {
+ checkAttributeValues("test1", 2);
+ }
+ }
+ }
+
+ /** */
+ @Test
+ public void testAttributeValueOverwrite() {
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test1", INT_ATTR,
1, STR_ATTR, "test2")) {
+ checkAttributeValues("test2", 1);
+ }
+ }
+
+ /** */
+ @Test
+ public void testConsequentScopes() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ try (Scope ignored1 = OperationContext.set(STR_ATTR, "test1",
INT_ATTR, 1)) {
+ checkAttributeValues("test1", 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ try (Scope ignored2 = OperationContext.set(INT_ATTR, 2)) {
+ checkAttributeValues(DFLT_STR_VAL, 2);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testNestedScopes() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) {
+ checkAttributeValues(DFLT_STR_VAL, 1);
+
+ try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) {
+ checkAttributeValues("test2", 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testNestedScopesAttributeValueOverwriteAndInheritance() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 1, STR_ATTR,
"test1")) {
+ checkAttributeValues("test1", 1);
+
+ try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) {
+ checkAttributeValues("test2", 1);
+ }
+
+ checkAttributeValues("test1", 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testNullAttributeValue() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, null, STR_ATTR,
null)) {
+ checkAttributeValues(null, null);
+
+ try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) {
+ checkAttributeValues("test2", null);
+
+ try (Scope ignored3 = OperationContext.set(STR_ATTR, null)) {
+ checkAttributeValues(null, null);
+ }
+
+ checkAttributeValues("test2", null);
+ }
+
+ checkAttributeValues(null, null);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testScopeWithInitialAttributeValue() {
+ try (Scope scope1 = OperationContext.set(INT_ATTR, DFLT_INT_VAL,
STR_ATTR, DFLT_STR_VAL)) {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ assertTrue(scope1 == Scope.NOOP_SCOPE);
+
+ try (Scope scope2 = OperationContext.set(INT_ATTR, DFLT_INT_VAL)) {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ assertTrue(scope2 == Scope.NOOP_SCOPE);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+ }
+
+ /** */
+ @Test
+ public void testNestedScopeWithTheSameAttributeValue() {
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) {
+ checkAttributeValues(DFLT_STR_VAL, 1);
+
+ try (Scope scope = OperationContext.set(INT_ATTR, 1)) {
+ checkAttributeValues(DFLT_STR_VAL, 1);
+
+ assertTrue(scope == Scope.NOOP_SCOPE);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, 1);
+ }
+ }
+
+ /** */
+ @Test
+ public void testRuntimeAttributeCreation() {
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) {
+ OperationContextAttribute<Object> attr =
OperationContextAttribute.newInstance();
+
+ assertNull(OperationContext.get(attr));
+
+ try (Scope ignored2 = OperationContext.set(attr, "test")) {
+ assertEquals("test", OperationContext.get(attr));
+ }
+
+ assertNull(OperationContext.get(attr));
+ }
+ }
+
+ /** */
+ @Test
+ public void testMaximumAttributesInstanceCount() {
+ int cnt = OperationContextAttribute.MAX_ATTR_CNT -
OperationContextAttribute.ID_GEN.get();
+
+ List<OperationContextAttribute<Integer>> attrs = new ArrayList<>(cnt);
+ LinkedList<Scope> scopes = new LinkedList<>();
+
+ for (int i = 0; i < cnt; i++) {
+ attrs.add(OperationContextAttribute.newInstance());
+
+ scopes.push(OperationContext.set(attrs.get(i), i));
+ }
+
+ try {
+ for (int i = 0; i < cnt; i++)
+ assertTrue(i == OperationContext.get(attrs.get(i)));
+ }
+ finally {
+ scopes.forEach(Scope::close);
+ }
+
+ assertTrue(attrs.stream().allMatch(attr -> OperationContext.get(attr)
== null));
+
+ assertThrowsAnyCause(
+ log,
+ OperationContextAttribute::newInstance,
+ AssertionError.class,
+ "Exceeded maximum supported number of created Attributes instances"
+ );
+ }
+
+ /** */
+ @Test
+ public void testUnorderedScopeClosing() {
+ Scope scope1 = OperationContext.set(INT_ATTR, 0);
+
+ try {
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test")) {
+ assertThrowsWithCause(scope1::close, AssertionError.class);
+ }
+ }
+ finally {
+ scope1.close();
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ assertThrowsWithCause(scope1::close, AssertionError.class);
+ }
+
+ /** */
+ @Test
+ public void testEmptySnapshot() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ OperationContextSnapshot snapshot = OperationContext.createSnapshot();
+
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testSnapshot() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ OperationContextSnapshot snapshot;
+
+ try (Scope ignored = OperationContext.set(INT_ATTR, 1, STR_ATTR,
"test1")) {
+ checkAttributeValues("test1", 1);
+
+ snapshot = OperationContext.createSnapshot();
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ checkAttributeValues("test1", 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testNestedScopeSnapshot() {
+ OperationContextSnapshot snapshot;
+
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 1, STR_ATTR,
"test1")) {
+ try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) {
+ checkAttributeValues("test2", 1);
+
+ snapshot = OperationContext.createSnapshot();
+ }
+ }
+
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ checkAttributeValues("test2", 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testNestedScopeInSnapshotScope() {
+ OperationContextSnapshot snapshot0;
+
+ try (Scope ignored = OperationContext.set(INT_ATTR, 1, STR_ATTR,
"test1")) {
+ checkAttributeValues("test1", 1);
+
+ snapshot0 = OperationContext.createSnapshot();
+ }
+
+ OperationContextSnapshot snapshot1;
+
+ try (Scope ignored1 = OperationContext.restoreSnapshot(snapshot0)) {
+ checkAttributeValues("test1", 1);
+
+ try (Scope ignored2 = OperationContext.set(INT_ATTR, 2)) {
+ checkAttributeValues("test1", 2);
+
+ snapshot1 = OperationContext.createSnapshot();
+ }
+
+ checkAttributeValues("test1", 1);
+ }
+
+ try (Scope ignored0 = OperationContext.restoreSnapshot(snapshot1)) {
+ checkAttributeValues("test1", 2);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testSnapshotRestoreInExistingScope() {
+ OperationContextSnapshot snapshot;
+
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test1")) {
+ checkAttributeValues("test1", DFLT_INT_VAL);
+
+ snapshot = OperationContext.createSnapshot();
+ }
+
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) {
+ checkAttributeValues(DFLT_STR_VAL, 1);
+
+ // Note, snapshot restores the state of the entire context,
including attributes that do not have a value set.
+ try (Scope ignored2 = OperationContext.restoreSnapshot(snapshot)) {
+ checkAttributeValues("test1", DFLT_INT_VAL);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, 1);
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testSnapshotNotAffectedByConsequentContextUpdates() {
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+
+ OperationContextSnapshot snapshot;
+
+ try (Scope ignored0 = OperationContext.set(INT_ATTR, 1)) {
+ checkAttributeValues(DFLT_STR_VAL, 1);
+
+ snapshot = OperationContext.createSnapshot();
+
+ try (Scope ignored1 = OperationContext.set(STR_ATTR, "test")) {
+ checkAttributeValues("test", 1);
+
+ try (Scope ignored =
OperationContext.restoreSnapshot(snapshot)) {
+ checkAttributeValues(DFLT_STR_VAL, 1);
+ }
+
+ checkAttributeValues("test", 1);
+ }
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testSnapshotScopeUnorderedClosing() {
+ OperationContextSnapshot snapshot;
+
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test1")) {
+ checkAttributeValues("test1", DFLT_INT_VAL);
+
+ snapshot = OperationContext.createSnapshot();
+ }
+
+ try (Scope snpScope = OperationContext.restoreSnapshot(snapshot)) {
+ try (Scope ignored1 = OperationContext.set(INT_ATTR, 2)) {
+ checkAttributeValues("test1", 2);
+
+ assertThrowsWithCause(snpScope::close, AssertionError.class);
+ }
+ }
+
+ checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ @Test
+ public void testContextAwareThreadPool() throws Exception {
+ OperationContextAwareThreadPoolExecutor pool = deferShutdown(new
OperationContextAwareThreadPoolExecutor(
+ "test",
+ null,
+ 1,
+ 1,
+ Long.MAX_VALUE,
+ new LinkedBlockingQueue<>(),
+ GridIoPolicy.UNDEFINED,
+ null));
+
+ doContextAwareExecutorServiceTest(pool);
+ }
+
+ /** */
+ @Test
+ public void testContextAwareStripedThreadPoolExecutor() throws Exception {
+ OperationContextAwareStripedThreadPoolExecutor pool =
deferShutdown(new OperationContextAwareStripedThreadPoolExecutor(
+ 2,
+ getTestIgniteInstanceName(0),
+ "",
+ (t, e) -> log.error("", e),
+ false,
+ 0
+ ));
+
+ BiConsumerX<String, Integer> checks = (s, i) -> pool.execute(new
AttributeValueChecker(s, i), 1);
+
+ createAttributeChecks(checks);
+
+ AttributeValueChecker.assertAllCreatedChecksPassed();
+ }
+
+ /** */
+ @Test
+ public void testContextAwareStripedExecutor() throws Exception {
+ OperationContextAwareStripedExecutor pool = deferShutdown(new
OperationContextAwareStripedExecutor(
+ 2,
+ getTestIgniteInstanceName(0),
+ "",
+ log,
+ e -> {},
+ false,
+ null,
+ getTestTimeout()
+ ));
+
+ BiConsumerX<String, Integer> checks = (s, i) -> {
+ pool.execute( new AttributeValueChecker(s, i));
+ pool.execute(1, new AttributeValueChecker(s, i));
+ };
+
+ createAttributeChecks(checks);
+
+ AttributeValueChecker.assertAllCreatedChecksPassed();
+ }
+
+ /** */
+ private void doContextAwareExecutorServiceTest(ExecutorService pool)
throws Exception {
+ CountDownLatch poolUnblockedLatch = blockPool(pool);
+
+ BiConsumerX<String, Integer> asyncChecks = (s, i) -> {
+ pool.submit((Runnable)new AttributeValueChecker(s, i));
+ pool.submit(new AttributeValueChecker(s, i), 0);
+ pool.submit((Callable<Integer>)new AttributeValueChecker(s, i));
+ };
+
+ BiConsumerX<String, Integer> syncChecks = (s, i) -> {
+ pool.invokeAny(List.of((Callable<Integer>)new
AttributeValueChecker(s, i)));
+ pool.invokeAny(List.of((Callable<Integer>)new
AttributeValueChecker(s, i)), 1000, MILLISECONDS);
+ pool.invokeAll(List.of((Callable<Integer>)new
AttributeValueChecker(s, i)));
+ pool.invokeAll(List.of((Callable<Integer>)new
AttributeValueChecker(s, i)), 1000, MILLISECONDS);
+ };
+
+ createAttributeChecks(asyncChecks);
+
+ poolUnblockedLatch.countDown();
+
+ createAttributeChecks(syncChecks);
+
+ AttributeValueChecker.assertAllCreatedChecksPassed();
+ }
+
+ /** */
+ private CountDownLatch blockPool(ExecutorService pool) {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ pool.submit(() -> {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ return latch;
+ }
+
+ /** */
+ private <T extends ExecutorService> T deferShutdown(T pool) {
+ poolToShutdownAfterTest = pool;
+
+ return pool;
+ }
+
+ /** */
+ private void createAttributeChecks(BiConsumerX<String, Integer>
checkGenerator) throws Exception {
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test1", INT_ATTR,
1)) {
+ checkGenerator.accept("test1", 1);
+ }
+
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test2", INT_ATTR,
2)) {
+ checkGenerator.accept("test2", 2);
+ }
+
+ checkGenerator.accept(DFLT_STR_VAL, DFLT_INT_VAL);
+ }
+
+ /** */
+ private static void checkAttributeValues(String strAttrVal, Integer
intAttrVal) {
+ assertEquals(intAttrVal, OperationContext.get(INT_ATTR));
+ assertEquals(strAttrVal, OperationContext.get(STR_ATTR));
+ }
+
+ /** */
+ private static class AttributeValueChecker extends CompletableFuture<Void>
implements Runnable, Callable<Integer> {
+ /** */
+ static final List<AttributeValueChecker> CHECKS = new ArrayList<>();
+
+ /** */
+ private final String strAttrVal;
+
+ /** */
+ private final Integer intAttrVal;
+
+ /** */
+ public AttributeValueChecker(String strAttrVal, Integer intAttrVal) {
+ this.strAttrVal = strAttrVal;
+ this.intAttrVal = intAttrVal;
+
+ CHECKS.add(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ checkAttributeValues(strAttrVal, intAttrVal);
+
+ complete(null);
+ }
+ catch (Throwable e) {
+ completeExceptionally(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() {
+ run();
+
+ return 0;
+ }
+
+ /** */
+ static void assertAllCreatedChecksPassed() throws Exception {
+ for (AttributeValueChecker check : CHECKS) {
+ check.get(1000, MILLISECONDS);
+ }
+ }
+ }
+
+ /** */
+ private interface BiConsumerX<T, U> {
+ /** */
+ void accept(T t, U u) throws Exception;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 52d0c722bb0..ba4e875162c 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -73,6 +73,7 @@ import
org.apache.ignite.internal.processors.security.scheduler.SchedulerRemoteS
import
org.apache.ignite.internal.processors.security.service.ServiceAuthorizationTest;
import
org.apache.ignite.internal.processors.security.service.ServiceStaticConfigTest;
import
org.apache.ignite.internal.processors.security.snapshot.SnapshotPermissionCheckTest;
+import
org.apache.ignite.internal.thread.context.OperationContextAttributesTest;
import org.apache.ignite.ssl.MultipleSSLContextsTest;
import org.apache.ignite.tools.junit.JUnitTeamcityReporter;
import org.junit.BeforeClass;
@@ -145,6 +146,7 @@ import org.junit.runners.Suite;
ActivationOnJoinWithoutPermissionsWithPersistenceTest.class,
SecurityContextInternalFuturePropagationTest.class,
NodeConnectionCertificateCapturingTest.class,
+ OperationContextAttributesTest.class,
})
public class SecurityTestSuite {
/** */
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java
index 524e9992736..201ea7140fe 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java
@@ -26,8 +26,8 @@ import
org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import
org.apache.ignite.internal.processors.authentication.IgniteAccessControlException;
import
org.apache.ignite.internal.processors.authentication.UserManagementException;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -268,7 +268,7 @@ public class SqlUserCommandSelfTest extends
GridCommonAbstractTest {
private void doSqlAs(int nodeIdx, String sql, String login, String pwd)
throws Exception {
SecurityContext secCtx = authenticate(grid(0), login, pwd);
- try (OperationSecurityContext ignored =
grid(nodeIdx).context().security().withContext(secCtx)) {
+ try (Scope ignored =
grid(nodeIdx).context().security().withContext(secCtx)) {
doSql(nodeIdx, sql);
}
}