This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e847e4d32b8 IGNITE-26230 Introduce MessageFormat (#6466)
e847e4d32b8 is described below

commit e847e4d32b8184e13c3e251433dd728e4870ca85
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Fri Aug 22 18:50:42 2025 +0400

    IGNITE-26230 Introduce MessageFormat (#6466)
---
 .../network/serialization/MessageFormat.java       | 39 ++++++++++++++++++++
 .../internal/network/NaiveMessageFormat.java       | 42 ++++++++++++++++++++++
 .../internal/network/netty/InboundDecoder.java     |  9 +++--
 .../internal/network/netty/OutboundEncoder.java    | 11 +++---
 .../internal/network/netty/PipelineUtils.java      |  8 +++--
 .../internal/network/netty/InboundDecoderTest.java | 16 +++++----
 .../network/serialization/MarshallableTest.java    |  7 ++--
 7 files changed, 115 insertions(+), 17 deletions(-)

diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java
 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java
new file mode 100644
index 00000000000..96bc5d329f3
--- /dev/null
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+/**
+ * Defines message format, meaning how message should be read or written. 
+ */
+public interface MessageFormat {
+    /**
+     * Creates a new message writer.
+     *
+     * @param serializationRegistry Registry to use.
+     * @param protoVer Binary protocol version to use for serialization.
+     */
+    MessageWriter writer(MessageSerializationRegistry serializationRegistry, 
byte protoVer);
+
+    /**
+     * Creates a new message reader.
+     *
+     * @param serializationRegistry Registry to use.
+     * @param protoVer Binary protocol version to use for deserialization.
+     */
+    MessageReader reader(MessageSerializationRegistry serializationRegistry, 
byte protoVer);
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java
new file mode 100644
index 00000000000..0103a598f38
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
+import org.apache.ignite.internal.network.serialization.MessageReader;
+import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.internal.network.serialization.MessageWriter;
+
+/**
+ * Naive message format that uses direct serialization and deserialization. 
Both nodes must be of the same version for this format to work.
+ *
+ * <p>This is the default message format used in Ignite.
+ */
+public class NaiveMessageFormat implements MessageFormat {
+    @Override
+    public MessageWriter writer(MessageSerializationRegistry 
serializationRegistry, byte protoVer) {
+        return new DirectMessageWriter(serializationRegistry, protoVer);
+    }
+
+    @Override
+    public MessageReader reader(MessageSerializationRegistry 
serializationRegistry, byte protoVer) {
+        return new DirectMessageReader(serializationRegistry, protoVer);
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index 9a859bb7796..e4096e7662d 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -27,9 +27,9 @@ import java.util.List;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.direct.DirectMessageReader;
 import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.MessageDeserializer;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
 import org.apache.ignite.internal.network.serialization.MessageReader;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 
@@ -52,6 +52,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
     /** Message group type, for partially read message headers. */
     private static final AttributeKey<Short> GROUP_TYPE_KEY = 
AttributeKey.valueOf("GROUP_TYPE");
 
+    private final MessageFormat messageFormat;
+
     /** Serialization service. */
     private final PerSessionSerializationService serializationService;
 
@@ -60,7 +62,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
      *
      * @param serializationService Serialization service.
      */
-    public InboundDecoder(PerSessionSerializationService serializationService) 
{
+    public InboundDecoder(MessageFormat messageFormat, 
PerSessionSerializationService serializationService) {
+        this.messageFormat = messageFormat;
         this.serializationService = serializationService;
     }
 
@@ -73,7 +76,7 @@ public class InboundDecoder extends ByteToMessageDecoder {
         MessageReader reader = readerAttr.get();
 
         if (reader == null) {
-            reader = new 
DirectMessageReader(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            reader = 
messageFormat.reader(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
             readerAttr.set(reader);
         }
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index f842190a3f5..82330c28845 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -32,15 +32,15 @@ import java.util.List;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
-import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
 import org.apache.ignite.internal.network.serialization.MessageSerializer;
 import org.apache.ignite.internal.network.serialization.MessageWriter;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 
 /**
- * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
+ * An encoder for the outbound messages that uses the provided {@link 
MessageFormat}.
  */
 public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> 
{
     /** Handler name. */
@@ -53,6 +53,8 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
     /** Message writer channel attribute key. */
     private static final AttributeKey<MessageWriter> WRITER_KEY = 
AttributeKey.valueOf("WRITER");
 
+    private final MessageFormat messageFormat;
+
     /** Serialization registry. */
     private final PerSessionSerializationService serializationService;
 
@@ -61,7 +63,8 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
      *
      * @param serializationService Serialization service.
      */
-    public OutboundEncoder(PerSessionSerializationService 
serializationService) {
+    public OutboundEncoder(MessageFormat messageFormat, 
PerSessionSerializationService serializationService) {
+        this.messageFormat = messageFormat;
         this.serializationService = serializationService;
     }
 
@@ -71,7 +74,7 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
         MessageWriter writer = writerAttr.get();
 
         if (writer == null) {
-            writer = new 
DirectMessageWriter(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            writer = 
messageFormat.writer(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
 
             writerAttr.set(writer);
         }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
index 7d5faac9dfc..c905e8ac6e3 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
@@ -22,9 +22,11 @@ import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NaiveMessageFormat;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 
 /** Pipeline utils. */
@@ -58,13 +60,15 @@ public class PipelineUtils {
      */
     public static void setup(ChannelPipeline pipeline, 
PerSessionSerializationService serializationService,
             HandshakeManager handshakeManager, Consumer<InNetworkObject> 
messageListener) {
+        MessageFormat messageFormat = new NaiveMessageFormat();
+
         // Consolidate flushes to bigger ones (improves throughput with 
smaller messages at the price of the latency).
         pipeline.addLast(new 
FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES,
 true));
 
-        pipeline.addLast(InboundDecoder.NAME, new 
InboundDecoder(serializationService));
+        pipeline.addLast(InboundDecoder.NAME, new 
InboundDecoder(messageFormat, serializationService));
         pipeline.addLast(HandshakeHandler.NAME, new 
HandshakeHandler(handshakeManager, messageListener, serializationService));
         pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new 
ChunkedWriteHandler());
-        pipeline.addLast(OutboundEncoder.NAME, new 
OutboundEncoder(serializationService));
+        pipeline.addLast(OutboundEncoder.NAME, new 
OutboundEncoder(messageFormat, serializationService));
         pipeline.addLast(IoExceptionSuppressingHandler.NAME, new 
IoExceptionSuppressingHandler());
     }
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 7a390bc22ed..3f87efd70c8 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -36,14 +36,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import org.apache.ignite.internal.network.AllTypesMessageGenerator;
+import org.apache.ignite.internal.network.NaiveMessageFormat;
 import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.messages.AllTypesMessage;
 import org.apache.ignite.internal.network.messages.NestedMessageMessage;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.internal.network.serialization.MessageSerializer;
+import org.apache.ignite.internal.network.serialization.MessageWriter;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import 
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
@@ -63,6 +65,8 @@ public class InboundDecoderTest extends 
BaseIgniteAbstractTest {
     /** Registry. */
     private final MessageSerializationRegistry registry = 
defaultSerializationRegistry();
 
+    private final MessageFormat messageFormat = new NaiveMessageFormat();
+
     /**
      * Tests that an {@link InboundDecoder} can successfully read a message 
with all types supported by direct marshalling.
      *
@@ -99,9 +103,9 @@ public class InboundDecoderTest extends 
BaseIgniteAbstractTest {
     private <T extends NetworkMessage> T sendAndReceive(T msg) {
         var serializationService = new SerializationService(registry, 
mock(UserObjectSerializationContext.class));
         var perSessionSerializationService = new 
PerSessionSerializationService(serializationService);
-        var channel = new EmbeddedChannel(new 
InboundDecoder(perSessionSerializationService));
+        var channel = new EmbeddedChannel(new InboundDecoder(messageFormat, 
perSessionSerializationService));
 
-        var writer = new DirectMessageWriter(registry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+        MessageWriter writer = messageFormat.writer(registry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
 
         MessageSerializer<NetworkMessage> serializer = 
registry.createSerializer(msg.groupType(), msg.messageType());
 
@@ -139,7 +143,7 @@ public class InboundDecoderTest extends 
BaseIgniteAbstractTest {
     public void testPartialHeader() throws Exception {
         var serializationService = new SerializationService(registry, 
mock(UserObjectSerializationContext.class));
         var perSessionSerializationService = new 
PerSessionSerializationService(serializationService);
-        var channel = new EmbeddedChannel(new 
InboundDecoder(perSessionSerializationService));
+        var channel = new EmbeddedChannel(new InboundDecoder(messageFormat, 
perSessionSerializationService));
 
         ByteBuf buffer = allocator.buffer();
 
@@ -174,11 +178,11 @@ public class InboundDecoderTest extends 
BaseIgniteAbstractTest {
 
         var serializationService = new SerializationService(registry, 
mock(UserObjectSerializationContext.class));
         var perSessionSerializationService = new 
PerSessionSerializationService(serializationService);
-        final var decoder = new InboundDecoder(perSessionSerializationService);
+        final var decoder = new InboundDecoder(messageFormat, 
perSessionSerializationService);
 
         final var list = new ArrayList<>();
 
-        var writer = new DirectMessageWriter(registry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+        MessageWriter writer = messageFormat.writer(registry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
 
         var msg = new 
TestMessagesFactory().testMessage().msg("abcdefghijklmn").build();
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index 3e21902f078..3e6ee0bd3a9 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.internal.network.NaiveMessageFormat;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.messages.MessageWithMarshallable;
@@ -68,6 +69,8 @@ public class MarshallableTest extends BaseIgniteAbstractTest {
 
     private final TestMessagesFactory msgFactory = new TestMessagesFactory();
 
+    private final MessageFormat messageFormat = new NaiveMessageFormat();
+
     /**
      * Tests that marshallable object can be serialized along with its 
descriptor.
      */
@@ -95,7 +98,7 @@ public class MarshallableTest extends BaseIgniteAbstractTest {
 
         var channel = new EmbeddedChannel(
                 new ChunkedWriteHandler(),
-                new OutboundEncoder(serializers.perSessionSerializationService)
+                new OutboundEncoder(messageFormat, 
serializers.perSessionSerializationService)
         );
 
         List<ClassDescriptorMessage> classDescriptorsMessages = 
PerSessionSerializationService.createClassDescriptorsMessages(
@@ -134,7 +137,7 @@ public class MarshallableTest extends 
BaseIgniteAbstractTest {
         PerSessionSerializationService perSessionSerializationService = 
serializers.perSessionSerializationService;
         ClassDescriptor descriptor = serializers.descriptor;
 
-        final var decoder = new InboundDecoder(perSessionSerializationService);
+        final var decoder = new InboundDecoder(messageFormat, 
perSessionSerializationService);
 
         int size = outBuffer.position();
 

Reply via email to