This is an automated email from the ASF dual-hosted git repository.

petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a2eda8a60b9 IGNITE-28784 Added propagation of OperationContext 
attributes to remote nodes via Communication (#13246)
a2eda8a60b9 is described below

commit a2eda8a60b96a3949e87efd5212efc34dc63f73c
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Jun 25 13:17:22 2026 +0300

    IGNITE-28784 Added propagation of OperationContext attributes to remote 
nodes via Communication (#13246)
---
 .../ignite/internal/CoreMessagesProvider.java      |   2 +-
 .../managers/communication/GridIoManager.java      |  19 +++-
 .../managers/communication/GridIoMessage.java      |   6 ++
 .../DistributedOperationContextManager.java        |   5 +
 .../spi/communication/tcp/TcpCommunicationSpi.java |   4 +-
 .../context/OperationContextAttributesTest.java    | 107 +++++++++++++++++++++
 6 files changed, 136 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index c0191788706..822ac28399a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -680,7 +680,7 @@ public class CoreMessagesProvider extends 
AbstractMarshallableMessageFactoryProv
         withNoSchemaResolvedClassLoader(PluginsDataBagItem.class);
         withSchema(EventsDataBagItem.class);
 
-        // [13400 - 13600]: Operation context messages.
+        // [13400 - 13500]: Operation context messages.
         msgIdx = 13400;
         withNoSchema(DistributedOperationContextMessage.class);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3acf503561a..bdbc375fc9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
 import org.apache.ignite.internal.processors.tracing.Span;
 import org.apache.ignite.internal.processors.tracing.SpanTags;
+import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
 import org.apache.ignite.internal.thread.context.Scope;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -455,10 +456,14 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
 
         ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, 
"Received bytes count.");
 
-        getSpi().setListener(commLsnr = new CommunicationListenerEx<Object>() {
+        getSpi().setListener(commLsnr = new CommunicationListenerEx<>() {
             @Override public void onMessage(UUID nodeId, Object msg, 
IgniteRunnable msgC) {
                 try {
-                    onMessage0(nodeId, (GridIoMessage)msg, msgC);
+                    GridIoMessage msg0 = (GridIoMessage)msg;
+
+                    try (Scope ignored = 
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg))
 {
+                        onMessage0(nodeId, msg0, msgC);
+                    }
                 }
                 catch (ClassCastException ignored) {
                     U.error(log, "Communication manager received message of 
unknown type (will ignore): " +
@@ -2037,16 +2042,22 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
         long timeout,
         boolean skipOnTimeout
     ) {
+        GridIoMessage res;
+
         if (ctx.security().enabled()) {
             UUID secSubjId = null;
 
             if (!ctx.security().isDefaultContext())
                 secSubjId = ctx.security().securityContext().subject().id();
 
-            return new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, 
ordered, timeout, skipOnTimeout);
+            res = new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, 
ordered, timeout, skipOnTimeout);
         }
+        else
+            res = new GridIoMessage(plc, topic, msg, ordered, timeout, 
skipOnTimeout);
+
+        res.opCtxMsg = 
DistributedOperationContextManager.instance().collectDistributedAttributes();
 
-        return new GridIoMessage(plc, topic, msg, ordered, timeout, 
skipOnTimeout);
+        return res;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 8cc6c106cf2..cb09122415d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import org.apache.ignite.internal.DistributedOperationContextMessage;
 import org.apache.ignite.internal.ExecutorAwareMessage;
 import org.apache.ignite.internal.GridTopicMessage;
 import org.apache.ignite.internal.Order;
@@ -64,6 +65,11 @@ public class GridIoMessage implements Message, SpanTransport 
{
     @Order(6)
     byte[] span;
 
+    /** Effective operation context attributes. */
+    @Order(7)
+    @GridToStringInclude
+    public @Nullable DistributedOperationContextMessage opCtxMsg;
+
     /**
      * Default constructor.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
index 77adfae5e64..0ef065b7f77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
@@ -145,4 +145,9 @@ public class DistributedOperationContextManager {
 
         return updater.apply();
     }
+
+    /** For testing purposes mostly. */
+    void clear() {
+        attrs.clear();
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3078e20a0e9..f9ab18727ea 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -616,7 +616,7 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
             ctxInitLatch,
             client,
             igniteExSupplier,
-            new CommunicationListener<Message>() {
+            new CommunicationListener<>() {
                 @Override public void onMessage(UUID nodeId, Message msg, 
IgniteRunnable msgC) {
                     notifyListener(nodeId, msg, msgC);
                 }
@@ -651,7 +651,7 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
             getWorkersRegistry(ignite),
             ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().metric() 
: null,
             this::createTcpClient,
-            new CommunicationListenerEx<Message>() {
+            new CommunicationListenerEx<>() {
                 @Override public void onMessage(UUID nodeId, Message msg, 
IgniteRunnable msgC) {
                     notifyListener(nodeId, msg, msgC);
                 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index bdf84b743ad..3571371f2eb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -36,10 +37,15 @@ import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -68,6 +74,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
 import org.junit.Test;
 import org.springframework.lang.NonNull;
+import org.springframework.lang.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -114,6 +121,8 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
 
         // Releases attribute IDs reserved during the test.
         OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds);
+
+        DistributedOperationContextManager.instance().clear();
     }
 
     /** */
@@ -911,6 +920,104 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
         assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
     }
 
+    /** */
+    @Test
+    public void testSendAttributesByCommunication() throws Exception {
+        byte attrId1 = 0;
+        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+
+        InetSocketAddressMessage dfltDistrAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
+        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+
+        // Local attribute 1.
+        OperationContextAttribute.newInstance(1000);
+
+        // Distributed attribute 1.
+        OperationContextAttribute<InetSocketAddressMessage> dAttr0 = 
DistributedOperationContextManager.instance()
+            .createDistributedAttribute(attrId1, dfltDistrAttr1Val);
+
+        // Local attribute 2.
+        OperationContextAttribute.newInstance("locaAttr2");
+
+        // Distributed attribute 2.
+        OperationContextAttribute<GridCacheVersion> dAttr1 = 
DistributedOperationContextManager.instance()
+            .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+
+        startGrids(2);
+        startClientGrid(2);
+
+        InetSocketAddressMessage valToSend0 = new 
InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443);
+        GridCacheVersion valToSend1 = new GridCacheVersion(2, 2, 2);
+
+        // Coordinator -> Server, Coordinator -> Client, Server -> Client, 
Client -> Server, etc.
+        for (int fromIdx = 0; fromIdx < 3; ++fromIdx) {
+            for (int toIdx = 0; toIdx < 3; ++toIdx) {
+                if (fromIdx == toIdx)
+                    continue;
+
+                // One value.
+                try (Scope ignored = OperationContext.set(dAttr0, valToSend0)) 
{
+                    checkOperationContextCommunicationTransmission(fromIdx, 
toIdx, dAttr0, null);
+                }
+
+                // A couple of values.
+                try (Scope ignored = OperationContext.set(dAttr0, valToSend0, 
dAttr1, valToSend1)) {
+                    checkOperationContextCommunicationTransmission(fromIdx, 
toIdx, dAttr0, dAttr1);
+                }
+            }
+        }
+    }
+
+    /** */
+    private void checkOperationContextCommunicationTransmission(
+        int gridFromIdx,
+        int gridToIdx,
+        OperationContextAttribute<InetSocketAddressMessage> attr0,
+        @Nullable OperationContextAttribute<GridCacheVersion> attr1
+    ) throws InterruptedException {
+        Ignite from = grid(gridFromIdx);
+        Ignite to = grid(gridToIdx);
+
+        CountDownLatch rcvLatch = new CountDownLatch(2);
+
+        InetSocketAddressMessage expVal0 = OperationContext.get(attr0);
+        GridCacheVersion expVal1 = attr1 == null ? null : 
OperationContext.get(attr1);
+
+        GridMessageListener lsnr = new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
+                if (msg instanceof IgniteIoTestMessage && 
((IgniteIoTestMessage)msg).request()) {
+                    InetSocketAddressMessage receivedVal0 = 
OperationContext.get(attr0);
+                    GridCacheVersion receivedVal1 = attr1 == null ? null : 
OperationContext.get(attr1);
+
+                    assertTrue(receivedVal0 != null && expVal0.port() == 
receivedVal0.port());
+                    assertTrue(receivedVal0 != null && 
expVal0.address().equals(receivedVal0.address()));
+
+                    if (attr1 != null)
+                        assertEquals(expVal1, receivedVal1);
+
+                    rcvLatch.countDown();
+                }
+            }
+        };
+
+        
((IgniteEx)to).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, lsnr);
+
+        try {
+            ((IgniteEx)from).context().io().sendIoTest(node(from, to), null, 
false);
+            ((IgniteEx)from).context().io().sendIoTest(node(from, to), null, 
true);
+
+            assertTrue(rcvLatch.await(getTestTimeout(), MILLISECONDS));
+        }
+        finally {
+            
assertTrue(((IgniteEx)to).context().io().removeMessageListener(GridTopic.TOPIC_IO_TEST,
 lsnr));
+        }
+    }
+
+    /** Prevents {@link ClusterNode#isLocal()} to be negative. */
+    private ClusterNode node(Ignite from, Ignite to) {
+        return from.cluster().node(((IgniteEx)to).localNode().id());
+    }
+
     /** */
     private void doContextAwareExecutorServiceTest(ExecutorService pool) 
throws Exception {
         CountDownLatch poolUnblockedLatch = blockPool(pool);

Reply via email to