This is an automated email from the ASF dual-hosted git repository.
petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a2eda8a60b9 IGNITE-28784 Added propagation of OperationContext
attributes to remote nodes via Communication (#13246)
a2eda8a60b9 is described below
commit a2eda8a60b96a3949e87efd5212efc34dc63f73c
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Jun 25 13:17:22 2026 +0300
IGNITE-28784 Added propagation of OperationContext attributes to remote
nodes via Communication (#13246)
---
.../ignite/internal/CoreMessagesProvider.java | 2 +-
.../managers/communication/GridIoManager.java | 19 +++-
.../managers/communication/GridIoMessage.java | 6 ++
.../DistributedOperationContextManager.java | 5 +
.../spi/communication/tcp/TcpCommunicationSpi.java | 4 +-
.../context/OperationContextAttributesTest.java | 107 +++++++++++++++++++++
6 files changed, 136 insertions(+), 7 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index c0191788706..822ac28399a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -680,7 +680,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchemaResolvedClassLoader(PluginsDataBagItem.class);
withSchema(EventsDataBagItem.class);
- // [13400 - 13600]: Operation context messages.
+ // [13400 - 13500]: Operation context messages.
msgIdx = 13400;
withNoSchema(DistributedOperationContextMessage.class);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3acf503561a..bdbc375fc9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
+import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -455,10 +456,14 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount,
"Received bytes count.");
- getSpi().setListener(commLsnr = new CommunicationListenerEx<Object>() {
+ getSpi().setListener(commLsnr = new CommunicationListenerEx<>() {
@Override public void onMessage(UUID nodeId, Object msg,
IgniteRunnable msgC) {
try {
- onMessage0(nodeId, (GridIoMessage)msg, msgC);
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ try (Scope ignored =
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg))
{
+ onMessage0(nodeId, msg0, msgC);
+ }
}
catch (ClassCastException ignored) {
U.error(log, "Communication manager received message of
unknown type (will ignore): " +
@@ -2037,16 +2042,22 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
long timeout,
boolean skipOnTimeout
) {
+ GridIoMessage res;
+
if (ctx.security().enabled()) {
UUID secSubjId = null;
if (!ctx.security().isDefaultContext())
secSubjId = ctx.security().securityContext().subject().id();
- return new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg,
ordered, timeout, skipOnTimeout);
+ res = new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg,
ordered, timeout, skipOnTimeout);
}
+ else
+ res = new GridIoMessage(plc, topic, msg, ordered, timeout,
skipOnTimeout);
+
+ res.opCtxMsg =
DistributedOperationContextManager.instance().collectDistributedAttributes();
- return new GridIoMessage(plc, topic, msg, ordered, timeout,
skipOnTimeout);
+ return res;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 8cc6c106cf2..cb09122415d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.managers.communication;
+import org.apache.ignite.internal.DistributedOperationContextMessage;
import org.apache.ignite.internal.ExecutorAwareMessage;
import org.apache.ignite.internal.GridTopicMessage;
import org.apache.ignite.internal.Order;
@@ -64,6 +65,11 @@ public class GridIoMessage implements Message, SpanTransport
{
@Order(6)
byte[] span;
+ /** Effective operation context attributes. */
+ @Order(7)
+ @GridToStringInclude
+ public @Nullable DistributedOperationContextMessage opCtxMsg;
+
/**
* Default constructor.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
index 77adfae5e64..0ef065b7f77 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
@@ -145,4 +145,9 @@ public class DistributedOperationContextManager {
return updater.apply();
}
+
+ /** For testing purposes mostly. */
+ void clear() {
+ attrs.clear();
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3078e20a0e9..f9ab18727ea 100755
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -616,7 +616,7 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
ctxInitLatch,
client,
igniteExSupplier,
- new CommunicationListener<Message>() {
+ new CommunicationListener<>() {
@Override public void onMessage(UUID nodeId, Message msg,
IgniteRunnable msgC) {
notifyListener(nodeId, msg, msgC);
}
@@ -651,7 +651,7 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
getWorkersRegistry(ignite),
ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().metric()
: null,
this::createTcpClient,
- new CommunicationListenerEx<Message>() {
+ new CommunicationListenerEx<>() {
@Override public void onMessage(UUID nodeId, Message msg,
IgniteRunnable msgC) {
notifyListener(nodeId, msg, msgC);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index bdf84b743ad..3571371f2eb 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -36,10 +37,15 @@ import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -68,6 +74,7 @@ import
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
import org.junit.Test;
import org.springframework.lang.NonNull;
+import org.springframework.lang.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -114,6 +121,8 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
// Releases attribute IDs reserved during the test.
OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds);
+
+ DistributedOperationContextManager.instance().clear();
}
/** */
@@ -911,6 +920,104 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
}
+ /** */
+ @Test
+ public void testSendAttributesByCommunication() throws Exception {
+ byte attrId1 = 0;
+ byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+
+ InetSocketAddressMessage dfltDistrAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
+ GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+
+ // Local attribute 1.
+ OperationContextAttribute.newInstance(1000);
+
+ // Distributed attribute 1.
+ OperationContextAttribute<InetSocketAddressMessage> dAttr0 =
DistributedOperationContextManager.instance()
+ .createDistributedAttribute(attrId1, dfltDistrAttr1Val);
+
+ // Local attribute 2.
+ OperationContextAttribute.newInstance("locaAttr2");
+
+ // Distributed attribute 2.
+ OperationContextAttribute<GridCacheVersion> dAttr1 =
DistributedOperationContextManager.instance()
+ .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+
+ startGrids(2);
+ startClientGrid(2);
+
+ InetSocketAddressMessage valToSend0 = new
InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443);
+ GridCacheVersion valToSend1 = new GridCacheVersion(2, 2, 2);
+
+ // Coordinator -> Server, Coordinator -> Client, Server -> Client,
Client -> Server, etc.
+ for (int fromIdx = 0; fromIdx < 3; ++fromIdx) {
+ for (int toIdx = 0; toIdx < 3; ++toIdx) {
+ if (fromIdx == toIdx)
+ continue;
+
+ // One value.
+ try (Scope ignored = OperationContext.set(dAttr0, valToSend0))
{
+ checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr0, null);
+ }
+
+ // A couple of values.
+ try (Scope ignored = OperationContext.set(dAttr0, valToSend0,
dAttr1, valToSend1)) {
+ checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr0, dAttr1);
+ }
+ }
+ }
+ }
+
+ /** */
+ private void checkOperationContextCommunicationTransmission(
+ int gridFromIdx,
+ int gridToIdx,
+ OperationContextAttribute<InetSocketAddressMessage> attr0,
+ @Nullable OperationContextAttribute<GridCacheVersion> attr1
+ ) throws InterruptedException {
+ Ignite from = grid(gridFromIdx);
+ Ignite to = grid(gridToIdx);
+
+ CountDownLatch rcvLatch = new CountDownLatch(2);
+
+ InetSocketAddressMessage expVal0 = OperationContext.get(attr0);
+ GridCacheVersion expVal1 = attr1 == null ? null :
OperationContext.get(attr1);
+
+ GridMessageListener lsnr = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc)
{
+ if (msg instanceof IgniteIoTestMessage &&
((IgniteIoTestMessage)msg).request()) {
+ InetSocketAddressMessage receivedVal0 =
OperationContext.get(attr0);
+ GridCacheVersion receivedVal1 = attr1 == null ? null :
OperationContext.get(attr1);
+
+ assertTrue(receivedVal0 != null && expVal0.port() ==
receivedVal0.port());
+ assertTrue(receivedVal0 != null &&
expVal0.address().equals(receivedVal0.address()));
+
+ if (attr1 != null)
+ assertEquals(expVal1, receivedVal1);
+
+ rcvLatch.countDown();
+ }
+ }
+ };
+
+
((IgniteEx)to).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, lsnr);
+
+ try {
+ ((IgniteEx)from).context().io().sendIoTest(node(from, to), null,
false);
+ ((IgniteEx)from).context().io().sendIoTest(node(from, to), null,
true);
+
+ assertTrue(rcvLatch.await(getTestTimeout(), MILLISECONDS));
+ }
+ finally {
+
assertTrue(((IgniteEx)to).context().io().removeMessageListener(GridTopic.TOPIC_IO_TEST,
lsnr));
+ }
+ }
+
+ /** Prevents {@link ClusterNode#isLocal()} to be negative. */
+ private ClusterNode node(Ignite from, Ignite to) {
+ return from.cluster().node(((IgniteEx)to).localNode().id());
+ }
+
/** */
private void doContextAwareExecutorServiceTest(ExecutorService pool)
throws Exception {
CountDownLatch poolUnblockedLatch = blockPool(pool);