This is an automated email from the ASF dual-hosted git repository.

petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a61b2d7f6b IGNITE-28808 Restricted distributed Operation Context 
attribute registration after node started (#13275)
7a61b2d7f6b is described below

commit 7a61b2d7f6b31e945f4532403d634d80d4802fdc
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Jun 26 09:08:03 2026 +0300

    IGNITE-28808 Restricted distributed Operation Context attribute 
registration after node started (#13275)
---
 .../ignite/internal/CoreMessagesProvider.java      |   2 +-
 .../apache/ignite/internal/GridKernalContext.java  |  10 +-
 .../ignite/internal/GridKernalContextImpl.java     |   6 +
 .../org/apache/ignite/internal/IgniteKernal.java   |  13 ++
 ...xtMessage.java => OperationContextMessage.java} |  10 +-
 .../managers/communication/GridIoManager.java      |   5 +-
 .../managers/communication/GridIoMessage.java      |   6 +-
 .../wal/reader/StandaloneGridKernalContext.java    |   9 +
 ...anager.java => OperationContextDispatcher.java} |  73 ++++---
 .../ignite/spi/discovery/tcp/ClientImpl.java       |   6 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |   5 +-
 .../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java |   6 +
 .../tcp/messages/TcpDiscoveryAbstractMessage.java  |   4 +-
 .../context/OperationContextAttributesTest.java    | 213 ++++++++++++---------
 14 files changed, 214 insertions(+), 154 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 936eef8bf39..f40b14826cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -687,7 +687,7 @@ public class CoreMessagesProvider extends 
AbstractMarshallableMessageFactoryProv
 
         // [13400 - 13500]: Operation context messages.
         msgIdx = 13400;
-        withNoSchema(DistributedOperationContextMessage.class);
+        withNoSchema(OperationContextMessage.class);
 
         // [13500 - 13600]: Rolling Upgrade messages.
         msgIdx = 13500;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 744a844f88e..cab4d2a8146 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -78,6 +78,7 @@ import 
org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.processors.tracing.Tracing;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.worker.WorkersRegistry;
@@ -211,12 +212,19 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     public MaintenanceRegistry maintenanceRegistry();
 
     /**
-     * Gets core message factoy.
+     * Gets core message factory.
      *
      * @return Core message factory.
      */
     public MessageFactory messageFactory();
 
+    /**
+     * Gets the distributed operation context dispatcher.
+     *
+     * @return The distributed operation context dispatcher.
+     */
+    public OperationContextDispatcher operationContextDispatcher();
+
     /**
      * Gets transformation processor.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 5d779302d1f..fb3dcdb3412 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -101,6 +101,7 @@ import 
org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.processors.tracing.Tracing;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
 import org.apache.ignite.internal.thread.pool.IgniteForkJoinPool;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
@@ -697,6 +698,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         return grid.messageFactory();
     }
 
+    /** {@inheritDoc} */
+    @Override public OperationContextDispatcher operationContextDispatcher() {
+        return grid.operationContextDispatcher();
+    }
+
     /** {@inheritDoc} */
     @Override public CacheObjectTransformerProcessor transformer() {
         return transProc;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8e92f8118fc..063774e7b13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -174,6 +174,7 @@ import 
org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
 import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
 import org.apache.ignite.internal.systemview.ConfigurationViewWalker;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
 import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -444,6 +445,9 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
     /** Core message factory. */
     private MessageFactory msgFactory;
 
+    /** Distributed operation context dispatcher. */
+    private OperationContextDispatcher operationCtxDispatcher;
+
     /**
      * No-arg constructor is required by externalization.
      */
@@ -930,6 +934,8 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
                 longJVMPauseDetector
             );
 
+            operationCtxDispatcher = new OperationContextDispatcher();
+
             startProcessor(new DiagnosticProcessor(ctx));
 
             mBeansMgr = new IgniteMBeansManager(this);
@@ -1152,6 +1158,8 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
             // All components exept Discovery are started, time to check if 
maintenance is still needed.
             mntcProc.prepareAndExecuteMaintenance();
 
+            operationCtxDispatcher.finishRegistration();
+
             gw.writeLock();
 
             try {
@@ -3059,6 +3067,11 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
         return msgFactory;
     }
 
+    /** @return Distributed operation context dispatcher. */
+    OperationContextDispatcher operationContextDispatcher() {
+        return operationCtxDispatcher;
+    }
+
     /**
      * Method is responsible for handling the {@link 
EventType#EVT_CLIENT_NODE_DISCONNECTED} event. Notify all the
      * GridComponents that the such even has been occurred (e.g. if the local 
client node disconnected from the cluster
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/OperationContextMessage.java
similarity index 79%
rename from 
modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/OperationContextMessage.java
index 42d5c7eda85..9dc9fdb82cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/OperationContextMessage.java
@@ -17,16 +17,16 @@
 
 package org.apache.ignite.internal;
 
-import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
 import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
- * Transport for {@link OperationContext} distributed attributes.
+ * Message for {@link OperationContext} distributed attributes.
  *
- * @see DistributedOperationContextManager
+ * @see OperationContextDispatcher
  */
-public class DistributedOperationContextMessage implements Message {
+public class OperationContextMessage implements Message {
     /** Values of operation context attributes. */
     @Order(0)
     public Message[] vals;
@@ -36,7 +36,7 @@ public class DistributedOperationContextMessage implements 
Message {
     public byte idBitmap;
 
     /** Empty constructor for serialization purposes. */
-    public DistributedOperationContextMessage() {
+    public OperationContextMessage() {
         // No-op.
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bdbc375fc9f..a83e6ae8e7d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -102,7 +102,6 @@ import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
 import org.apache.ignite.internal.processors.tracing.Span;
 import org.apache.ignite.internal.processors.tracing.SpanTags;
-import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
 import org.apache.ignite.internal.thread.context.Scope;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -461,7 +460,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
                 try {
                     GridIoMessage msg0 = (GridIoMessage)msg;
 
-                    try (Scope ignored = 
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg))
 {
+                    try (Scope ignored = 
ctx.operationContextDispatcher().restoreDistributedAttributes(msg0.opCtxMsg)) {
                         onMessage0(nodeId, msg0, msgC);
                     }
                 }
@@ -2055,7 +2054,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
         else
             res = new GridIoMessage(plc, topic, msg, ordered, timeout, 
skipOnTimeout);
 
-        res.opCtxMsg = 
DistributedOperationContextManager.instance().collectDistributedAttributes();
+        res.opCtxMsg = 
ctx.operationContextDispatcher().collectDistributedAttributes();
 
         return res;
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index cb09122415d..1865868a71a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.managers.communication;
 
-import org.apache.ignite.internal.DistributedOperationContextMessage;
 import org.apache.ignite.internal.ExecutorAwareMessage;
 import org.apache.ignite.internal.GridTopicMessage;
+import org.apache.ignite.internal.OperationContextMessage;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -65,10 +65,10 @@ public class GridIoMessage implements Message, 
SpanTransport {
     @Order(6)
     byte[] span;
 
-    /** Effective operation context attributes. */
+    /** Effective operation context attributes to propagate. */
     @Order(7)
     @GridToStringInclude
-    public @Nullable DistributedOperationContextMessage opCtxMsg;
+    public @Nullable OperationContextMessage opCtxMsg;
 
     /**
      * Default constructor.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 48525f5d697..dd0d90b4167 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -103,6 +103,7 @@ import 
org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.processors.tracing.NoopTracing;
 import org.apache.ignite.internal.processors.tracing.Tracing;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -176,6 +177,9 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     /** Marshaller. */
     private final BinaryMarshaller marsh;
 
+    /** Operation context dispacther. */
+    private final OperationContextDispatcher opCtxDispatcher = new 
OperationContextDispatcher();
+
     /**
      * @param log Logger.
      * @param ft Node file tree.
@@ -454,6 +458,11 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
         return null;
     }
 
+    /** {@inheritDoc} */
+    @Override public OperationContextDispatcher operationContextDispatcher() {
+        return opCtxDispatcher;
+    }
+
     /** {@inheritDoc} */
     @Override public CacheObjectTransformerProcessor transformer() {
         return transProc;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java
similarity index 57%
rename from 
modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java
index 0ef065b7f77..a0669286576 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java
@@ -21,7 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.DistributedOperationContextMessage;
+import org.apache.ignite.internal.OperationContextMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
@@ -37,71 +37,62 @@ import org.jetbrains.annotations.Nullable;
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be registered with the {@link 
#registerDistributedAttribute(int, OperationContextAttribute)} method.
  *
- * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to
- * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
+ * <p> Note, that the maximum number of distributed attributes to register is 
currently limited to
+ * {@link #MAX_ATTRS_CNT} for implementation reasons.</p>
  *
  * @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
  */
-public class DistributedOperationContextManager {
-    /** */
-    private static final DistributedOperationContextManager INSTANCE = new 
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
     /** Maximal number of supported distributed attributes. */
-    static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
+    static final byte MAX_ATTRS_CNT = Byte.SIZE;
 
     /** Registered distributed attributes by their cluster-wide id. */
-    private final Map<Byte, OperationContextAttribute<Message>> attrs = new 
ConcurrentSkipListMap<>();
+    private final Map<Byte, OperationContextAttribute<? extends Message>> 
attrs = new ConcurrentSkipListMap<>();
 
-    /** */
-    public static DistributedOperationContextManager instance() {
-        return INSTANCE;
-    }
+    /** Whether the registration of new distributed attributes is allowed. */
+    private volatile boolean regFinished;
 
     /**
-     * Creates a new {@link OperationContext} attribute with the specified 
distributed ID and initial value.
+     * Registers an attribute of {@link OperationContext} with the specified 
distributed ID.
      *
      * <p>The distributed ID is used to consistently identify the attribute 
across all nodes in the cluster.
-     * It must be unique, and its value must be in the range from {@code 0} 
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
-     *
-     * <p>The value of the created attribute is automatically captured and 
propagated between cluster nodes
-     * during message transmission.</p>
+     * It must be unique, and its value must be in the range [{@code 0} : 
{@code Byte.SIZE}).</p>
      *
-     * @see OperationContextAttribute#newInstance(Object)
+     * <p>Registered attribute value is automatically captured and propagated 
between cluster nodes
+     * during the messages transmission.</p>
      */
-    public <T extends Message> OperationContextAttribute<T> 
createDistributedAttribute(byte id, @Nullable T initVal) {
-        assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed 
attributed id [id=" + id + ']';
+    public <T extends Message> void registerDistributedAttribute(int id, 
OperationContextAttribute<T> attr) {
+        if (regFinished)
+            throw new IgniteException("Initialization of distributed operation 
context attributes has already finished.");
 
-        return (OperationContextAttribute<T>)attrs.compute(id, (id0, attr0) -> 
{
-            if (attr0 != null)
-                throw new IgniteException("Duplicated distributed attribute id 
[id=" + id + ']');
+        assert id >= 0 && id < MAX_ATTRS_CNT : "Invalid distributed attributed 
id [id=" + id + ']';
 
-            return OperationContextAttribute.newInstance(initVal);
-        });
+        if (attrs.putIfAbsent((byte)id, attr) != null)
+            throw new IgniteException("Duplicated distributed attribute id 
[id=" + id + ']');
     }
 
     /**
-     * Collects the values of all distributed {@link 
OperationContextAttribute}s registered by this manager in a format
-     * suitable for transmission between cluster nodes.
+     * Collects the values of all distributed {@link 
OperationContextAttribute}s registered by this dispatcher.
      *
      * @see OperationContext#get(OperationContextAttribute)
      */
-    public @Nullable DistributedOperationContextMessage 
collectDistributedAttributes() {
-        DistributedOperationContextMessage res = null;
+    public @Nullable OperationContextMessage collectDistributedAttributes() {
+        OperationContextMessage res = null;
         List<Message> vals = null;
 
-        for (Map.Entry<Byte, OperationContextAttribute<Message>> e : 
attrs.entrySet()) {
+        for (Map.Entry<Byte, OperationContextAttribute<? extends Message>> e : 
attrs.entrySet()) {
             OperationContextAttribute<? extends Message> attr = e.getValue();
 
             Message curVal = OperationContext.get(attr);
 
             if (curVal != attr.initialValue()) {
                 if (res == null) {
-                    res = new DistributedOperationContextMessage();
+                    res = new OperationContextMessage();
 
-                    vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2);
+                    vals = new ArrayList<>(MAX_ATTRS_CNT / 2);
                 }
 
                 byte mask = (byte)(1 << e.getKey());
@@ -120,13 +111,13 @@ public class DistributedOperationContextManager {
     }
 
     /** Restores distributed {@link OperationContextAttribute} values received 
from a remote node. */
-    public Scope restoreDistributedAttributes(@Nullable 
DistributedOperationContextMessage msg) {
+    public Scope restoreDistributedAttributes(@Nullable 
OperationContextMessage msg) {
         if (msg == null)
             return Scope.NOOP_SCOPE;
 
         assert msg.idBitmap != 0;
         assert !F.isEmpty(msg.vals);
-        assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT;
+        assert msg.vals.length <= MAX_ATTRS_CNT;
 
         OperationContext.ContextUpdater updater = 
OperationContext.ContextUpdater.create();
 
@@ -136,7 +127,7 @@ public class DistributedOperationContextManager {
             while ((msg.idBitmap & (1 << maskIdx)) == 0)
                 ++maskIdx;
 
-            OperationContextAttribute<Message> attr = attrs.get(maskIdx++);
+            OperationContextAttribute<Message> attr = 
(OperationContextAttribute<Message>)attrs.get(maskIdx++);
 
             assert attr != null;
 
@@ -146,8 +137,8 @@ public class DistributedOperationContextManager {
         return updater.apply();
     }
 
-    /** For testing purposes mostly. */
-    void clear() {
-        attrs.clear();
+    /** Restricts further registration of distributed attributes. */
+    public void finishRegistration() {
+        regFinished = true;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index ccf622be6ea..86a4f190026 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,7 +70,6 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
 import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
 import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
 import 
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
-import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
 import org.apache.ignite.internal.thread.context.Scope;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1312,7 +1311,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void sendMessage(TcpDiscoveryAbstractMessage msg) {
-            msg.opCtxMsg = 
DistributedOperationContextManager.instance().collectDistributedAttributes();
+            msg.opCtxMsg = 
operationCtxDispatcher.collectDistributedAttributes();
 
             synchronized (mux) {
                 queue.add(msg);
@@ -1765,8 +1764,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         ? (TcpDiscoveryAbstractMessage)msg
                         : null;
 
-                    try (Scope ignored = 
DistributedOperationContextManager.instance()
-                        .restoreDistributedAttributes(dm == null ? null : 
dm.opCtxMsg)) {
+                    try (Scope ignored = 
operationCtxDispatcher.restoreDistributedAttributes(dm == null ? null : 
dm.opCtxMsg)) {
                         if (msg instanceof JoinTimeout) {
                             int joinCnt0 = ((JoinTimeout)msg).joinCnt;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 75c24745be3..9af1a4200d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -95,7 +95,6 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
 import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
 import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
 import 
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
-import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
 import org.apache.ignite.internal.thread.context.Scope;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
 import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
@@ -3019,7 +3018,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (!fromSocket)
-                msg.opCtxMsg = 
DistributedOperationContextManager.instance().collectDistributedAttributes();
+                msg.opCtxMsg = 
operationCtxDispatcher.collectDistributedAttributes();
 
             if (msg instanceof TraceableMessage tMsg) {
 
@@ -3291,7 +3290,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg == WAKEUP)
                 return;
 
-            try (Scope ignored = 
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg))
 {
+            try (Scope ignored = 
operationCtxDispatcher.restoreDistributedAttributes(msg.opCtxMsg)) {
                 processMessage0(msg);
             }
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 789f3d0adb1..25ad69c147f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage;
 import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
 import org.apache.ignite.internal.processors.tracing.NoopTracing;
 import org.apache.ignite.internal.processors.tracing.Tracing;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -139,6 +140,9 @@ abstract class TcpDiscoveryImpl {
     /** Tracing. */
     protected Tracing tracing;
 
+    /** Distributed operation context dispatcher. */
+    protected final OperationContextDispatcher operationCtxDispatcher;
+
     /**
      * @param spi Adapter.
      */
@@ -151,6 +155,8 @@ abstract class TcpDiscoveryImpl {
             tracing = ((IgniteEx)spi.ignite()).context().tracing();
         else
             tracing = new NoopTracing();
+
+        operationCtxDispatcher = 
((IgniteEx)spi.ignite()).context().operationContextDispatcher();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 60c38866034..5f09498060e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -21,7 +21,7 @@ import java.io.Externalizable;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.internal.DistributedOperationContextMessage;
+import org.apache.ignite.internal.OperationContextMessage;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -80,7 +80,7 @@ public abstract class TcpDiscoveryAbstractMessage implements 
Message {
     /** Operation context attributes message. */
     @GridToStringInclude
     @Order(5)
-    public @Nullable DistributedOperationContextMessage opCtxMsg;
+    public @Nullable OperationContextMessage opCtxMsg;
 
     /**
      * Default no-arg constructor for {@link Externalizable} interface.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index 3571371f2eb..b4003de3bcf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -22,10 +22,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
@@ -40,6 +42,8 @@ import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -59,6 +63,7 @@ import 
org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteStripedExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
+import org.apache.ignite.internal.util.GridByteArrayList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -69,6 +74,9 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
@@ -77,6 +85,7 @@ import org.springframework.lang.NonNull;
 import org.springframework.lang.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -101,6 +110,9 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
     /** */
     private int beforeTestReservedAttrIds;
 
+    /** */
+    private @Nullable PluginProvider pluginProvider;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -121,8 +133,16 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
 
         // Releases attribute IDs reserved during the test.
         OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds);
+    }
 
-        DistributedOperationContextManager.instance().clear();
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (pluginProvider != null)
+            cfg.setPluginProviders(pluginProvider);
+
+        return cfg;
     }
 
     /** */
@@ -831,36 +851,82 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
     /** */
     @Test
     public void testSendAttributesByDiscovery() throws Exception {
-        byte attrId1 = 0;
-        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+        doTestOperationContextAttributesPropagation(true);
+    }
 
-        InetSocketAddressMessage dfltDistAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
-        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+    /** */
+    @Test
+    public void testSendAttributesByCommunication() throws Exception {
+        doTestOperationContextAttributesPropagation(false);
+    }
 
-        // Local attribute 1.
-        OperationContextAttribute.newInstance(1000);
+    /** */
+    private void doTestOperationContextAttributesPropagation(boolean 
discovery) throws Exception {
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+            OperationContextAttribute.newInstance(new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
 
-        // Distributed attribute 1.
-        OperationContextAttribute<InetSocketAddressMessage> dAttr1 = 
DistributedOperationContextManager.instance()
-            .createDistributedAttribute(attrId1, dfltDistAttr1Val);
+        OperationContextAttribute<GridCacheVersion> dAttr2 = 
OperationContextAttribute.newInstance(new GridCacheVersion(1, 1, 1));
 
-        // Local attribute 2.
-        OperationContextAttribute.newInstance("locaAttr2");
+        OperationContextAttribute<GridByteArrayList> otherTestAttr = 
OperationContextAttribute.newInstance(new GridByteArrayList());
+
+        pluginProvider = new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "TestDistributedOperationContextAttributesRegistrator";
+            }
+
+            @Override public void start(PluginContext ctx) {
+                GridKernalContext kctx = ((IgniteEx)ctx.grid()).context();
+
+                
kctx.operationContextDispatcher().registerDistributedAttribute(0, dAttr1);
+
+                
kctx.operationContextDispatcher().registerDistributedAttribute(OperationContextDispatcher.MAX_ATTRS_CNT
 - 1, dAttr2);
 
-        // Distributed attribute 2.
-        OperationContextAttribute<GridCacheVersion> dAttr2 = 
DistributedOperationContextManager.instance()
-            .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+                assertThrowsAnyCause(
+                    log,
+                    () -> {
+                        
kctx.operationContextDispatcher().registerDistributedAttribute(0, 
otherTestAttr);
+                        return null;
+
+                    }, IgniteException.class,
+                    "Duplicated distributed attribute id"
+                );
+            }
+        };
+
+        // Local attribute 1.
+        OperationContextAttribute.newInstance(1000);
 
         startGrids(2);
         startClientGrid(2);
 
-        CountDownLatch coordLatch = new CountDownLatch(3);
-        CountDownLatch srvrLatch = new CountDownLatch(3);
-        CountDownLatch clientLatch = new CountDownLatch(3);
+        assertThrows(
+            null,
+            () -> 
grid(0).context().operationContextDispatcher().registerDistributedAttribute(1, 
null),
+            IgniteException.class,
+            "Initialization of distributed operation context attributes has 
already finished"
+        );
 
-        InetSocketAddressMessage valToSend1 = new 
InetSocketAddressMessage(dfltDistAttr1Val.address(), 443);
+        // Local attribute 2.
+        OperationContextAttribute.newInstance("locaAttr2");
+
+        InetSocketAddressMessage valToSend1 = new 
InetSocketAddressMessage(dAttr1.initialValue().address(), 443);
         GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2);
 
+        if (discovery)
+            
doTestOperationContextAttributesPropagationThroughDiscovery(dAttr1, valToSend1, 
dAttr2, valToSend2);
+        else
+            
doTestOperationContextAttributesPropagationThroughCommunication(dAttr1, 
valToSend1, dAttr2, valToSend2);
+    }
+
+    /** */
+    private void doTestOperationContextAttributesPropagationThroughDiscovery(
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1,
+        InetSocketAddressMessage valToSend1,
+        OperationContextAttribute<GridCacheVersion> dAttr2,
+        GridCacheVersion valToSend2
+    ) throws Exception {
+        Set<Integer> checkedNodes = ConcurrentHashMap.newKeySet();
+
         for (int i = 0; i < G.allGrids().size(); ++i) {
             int i0 = i;
 
@@ -872,22 +938,12 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
                         InetSocketAddressMessage receivedVal1 = 
OperationContext.get(dAttr1);
                         GridCacheVersion receivedVal2 = 
OperationContext.get(dAttr2);
 
-                        assertNotNull(receivedVal1);
-                        assertNotNull(receivedVal2);
-
-                        assertFalse(dfltDistAttr1Val.port() == 
receivedVal1.port());
-                        assertEquals(receivedVal1.port(), valToSend1.port());
-                        assertEquals(receivedVal1.address(), 
valToSend1.address());
+                        assertTrue(receivedVal1 != null && valToSend1.port() 
== receivedVal1.port());
+                        assertTrue(receivedVal1 != null && 
valToSend1.address().equals(receivedVal1.address()));
 
-                        assertFalse(dfltDistrAttr2Val.equals(receivedVal2));
-                        assertTrue(valToSend2.equals(receivedVal2));
+                        assertEquals(valToSend2, receivedVal2);
 
-                        if (grid(i0).localNode().isClient())
-                            clientLatch.countDown();
-                        else if (grid(i0).localNode().order() == 1)
-                            coordLatch.countDown();
-                        else
-                            srvrLatch.countDown();
+                        checkedNodes.add(i0);
                     }
                 });
         }
@@ -897,58 +953,33 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
             grid(0).createCache(defaultCacheConfiguration());
         }
 
-        assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, 
getTestTimeout()));
-        assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, 
getTestTimeout()));
-        assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, 
getTestTimeout()));
+        assertTrue(waitForCondition(() -> checkedNodes.size() == 3, 
getTestTimeout(), 50));
+        checkedNodes.clear();
 
         // Send from a server.
         try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, 
valToSend2)) {
             grid(1).destroyCache(DEFAULT_CACHE_NAME);
         }
 
-        assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, 
getTestTimeout()));
-        assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, 
getTestTimeout()));
-        assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, 
getTestTimeout()));
+        assertTrue(waitForCondition(() -> checkedNodes.size() == 3, 
getTestTimeout(), 50));
+        checkedNodes.clear();
 
         // Send from a client.
         try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, 
valToSend2)) {
             grid(2).createCache(defaultCacheConfiguration());
         }
 
-        assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
-        assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
-        assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+        assertTrue(waitForCondition(() -> checkedNodes.size() == 3, 
getTestTimeout(), 50));
+        checkedNodes.clear();
     }
 
     /** */
-    @Test
-    public void testSendAttributesByCommunication() throws Exception {
-        byte attrId1 = 0;
-        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
-
-        InetSocketAddressMessage dfltDistrAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
-        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
-
-        // Local attribute 1.
-        OperationContextAttribute.newInstance(1000);
-
-        // Distributed attribute 1.
-        OperationContextAttribute<InetSocketAddressMessage> dAttr0 = 
DistributedOperationContextManager.instance()
-            .createDistributedAttribute(attrId1, dfltDistrAttr1Val);
-
-        // Local attribute 2.
-        OperationContextAttribute.newInstance("locaAttr2");
-
-        // Distributed attribute 2.
-        OperationContextAttribute<GridCacheVersion> dAttr1 = 
DistributedOperationContextManager.instance()
-            .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
-
-        startGrids(2);
-        startClientGrid(2);
-
-        InetSocketAddressMessage valToSend0 = new 
InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443);
-        GridCacheVersion valToSend1 = new GridCacheVersion(2, 2, 2);
-
+    private void 
doTestOperationContextAttributesPropagationThroughCommunication(
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1,
+        InetSocketAddressMessage valToSend1,
+        OperationContextAttribute<GridCacheVersion> dAttr2,
+        GridCacheVersion valToSend2
+    ) throws Exception {
         // Coordinator -> Server, Coordinator -> Client, Server -> Client, 
Client -> Server, etc.
         for (int fromIdx = 0; fromIdx < 3; ++fromIdx) {
             for (int toIdx = 0; toIdx < 3; ++toIdx) {
@@ -956,13 +987,13 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
                     continue;
 
                 // One value.
-                try (Scope ignored = OperationContext.set(dAttr0, valToSend0)) 
{
-                    checkOperationContextCommunicationTransmission(fromIdx, 
toIdx, dAttr0, null);
+                try (Scope ignored = OperationContext.set(dAttr1, valToSend1)) 
{
+                    checkOperationContextCommunicationTransmission(fromIdx, 
toIdx, dAttr1, null);
                 }
 
                 // A couple of values.
-                try (Scope ignored = OperationContext.set(dAttr0, valToSend0, 
dAttr1, valToSend1)) {
-                    checkOperationContextCommunicationTransmission(fromIdx, 
toIdx, dAttr0, dAttr1);
+                try (Scope ignored = OperationContext.set(dAttr1, valToSend1, 
dAttr2, valToSend2)) {
+                    checkOperationContextCommunicationTransmission(fromIdx, 
toIdx, dAttr1, dAttr2);
                 }
             }
         }
@@ -972,44 +1003,44 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
     private void checkOperationContextCommunicationTransmission(
         int gridFromIdx,
         int gridToIdx,
-        OperationContextAttribute<InetSocketAddressMessage> attr0,
-        @Nullable OperationContextAttribute<GridCacheVersion> attr1
-    ) throws InterruptedException {
-        Ignite from = grid(gridFromIdx);
-        Ignite to = grid(gridToIdx);
+        OperationContextAttribute<InetSocketAddressMessage> attr1,
+        @Nullable OperationContextAttribute<GridCacheVersion> attr2
+    ) throws Exception {
+        IgniteEx from = grid(gridFromIdx);
+        IgniteEx to = grid(gridToIdx);
 
         CountDownLatch rcvLatch = new CountDownLatch(2);
 
-        InetSocketAddressMessage expVal0 = OperationContext.get(attr0);
-        GridCacheVersion expVal1 = attr1 == null ? null : 
OperationContext.get(attr1);
+        InetSocketAddressMessage expVal1 = OperationContext.get(attr1);
+        GridCacheVersion expVal2 = attr2 == null ? null : 
OperationContext.get(attr2);
 
         GridMessageListener lsnr = new GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
                 if (msg instanceof IgniteIoTestMessage && 
((IgniteIoTestMessage)msg).request()) {
-                    InetSocketAddressMessage receivedVal0 = 
OperationContext.get(attr0);
-                    GridCacheVersion receivedVal1 = attr1 == null ? null : 
OperationContext.get(attr1);
+                    InetSocketAddressMessage receivedVal1 = 
OperationContext.get(attr1);
+                    GridCacheVersion receivedVal2 = attr2 == null ? null : 
OperationContext.get(attr2);
 
-                    assertTrue(receivedVal0 != null && expVal0.port() == 
receivedVal0.port());
-                    assertTrue(receivedVal0 != null && 
expVal0.address().equals(receivedVal0.address()));
+                    assertTrue(receivedVal1 != null && expVal1.port() == 
receivedVal1.port());
+                    assertTrue(receivedVal1 != null && 
expVal1.address().equals(receivedVal1.address()));
 
-                    if (attr1 != null)
-                        assertEquals(expVal1, receivedVal1);
+                    if (attr2 != null)
+                        assertEquals(expVal2, receivedVal2);
 
                     rcvLatch.countDown();
                 }
             }
         };
 
-        
((IgniteEx)to).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, lsnr);
+        to.context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, lsnr);
 
         try {
-            ((IgniteEx)from).context().io().sendIoTest(node(from, to), null, 
false);
-            ((IgniteEx)from).context().io().sendIoTest(node(from, to), null, 
true);
+            from.context().io().sendIoTest(node(from, to), null, false);
+            from.context().io().sendIoTest(node(from, to), null, true);
 
             assertTrue(rcvLatch.await(getTestTimeout(), MILLISECONDS));
         }
         finally {
-            
assertTrue(((IgniteEx)to).context().io().removeMessageListener(GridTopic.TOPIC_IO_TEST,
 lsnr));
+            
assertTrue(to.context().io().removeMessageListener(GridTopic.TOPIC_IO_TEST, 
lsnr));
         }
     }
 


Reply via email to