This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new e847e4d32b8 IGNITE-26230 Introduce MessageFormat (#6466) e847e4d32b8 is described below commit e847e4d32b8184e13c3e251433dd728e4870ca85 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Fri Aug 22 18:50:42 2025 +0400 IGNITE-26230 Introduce MessageFormat (#6466) --- .../network/serialization/MessageFormat.java | 39 ++++++++++++++++++++ .../internal/network/NaiveMessageFormat.java | 42 ++++++++++++++++++++++ .../internal/network/netty/InboundDecoder.java | 9 +++-- .../internal/network/netty/OutboundEncoder.java | 11 +++--- .../internal/network/netty/PipelineUtils.java | 8 +++-- .../internal/network/netty/InboundDecoderTest.java | 16 +++++---- .../network/serialization/MarshallableTest.java | 7 ++-- 7 files changed, 115 insertions(+), 17 deletions(-) diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java new file mode 100644 index 00000000000..96bc5d329f3 --- /dev/null +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.serialization; + +/** + * Defines message format, meaning how message should be read or written. + */ +public interface MessageFormat { + /** + * Creates a new message writer. + * + * @param serializationRegistry Registry to use. + * @param protoVer Binary protocol version to use for serialization. + */ + MessageWriter writer(MessageSerializationRegistry serializationRegistry, byte protoVer); + + /** + * Creates a new message reader. + * + * @param serializationRegistry Registry to use. + * @param protoVer Binary protocol version to use for deserialization. + */ + MessageReader reader(MessageSerializationRegistry serializationRegistry, byte protoVer); +} diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java new file mode 100644 index 00000000000..0103a598f38 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import org.apache.ignite.internal.network.direct.DirectMessageReader; +import org.apache.ignite.internal.network.direct.DirectMessageWriter; +import org.apache.ignite.internal.network.serialization.MessageFormat; +import org.apache.ignite.internal.network.serialization.MessageReader; +import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; +import org.apache.ignite.internal.network.serialization.MessageWriter; + +/** + * Naive message format that uses direct serialization and deserialization. Both nodes must be of the same version for this format to work. + * + * <p>This is the default message format used in Ignite. + */ +public class NaiveMessageFormat implements MessageFormat { + @Override + public MessageWriter writer(MessageSerializationRegistry serializationRegistry, byte protoVer) { + return new DirectMessageWriter(serializationRegistry, protoVer); + } + + @Override + public MessageReader reader(MessageSerializationRegistry serializationRegistry, byte protoVer) { + return new DirectMessageReader(serializationRegistry, protoVer); + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java index 9a859bb7796..e4096e7662d 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java @@ -27,9 +27,9 @@ import java.util.List; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.NetworkMessage; -import org.apache.ignite.internal.network.direct.DirectMessageReader; import org.apache.ignite.internal.network.message.ClassDescriptorListMessage; import org.apache.ignite.internal.network.serialization.MessageDeserializer; +import org.apache.ignite.internal.network.serialization.MessageFormat; import org.apache.ignite.internal.network.serialization.MessageReader; import org.apache.ignite.internal.network.serialization.PerSessionSerializationService; @@ -52,6 +52,8 @@ public class InboundDecoder extends ByteToMessageDecoder { /** Message group type, for partially read message headers. */ private static final AttributeKey<Short> GROUP_TYPE_KEY = AttributeKey.valueOf("GROUP_TYPE"); + private final MessageFormat messageFormat; + /** Serialization service. */ private final PerSessionSerializationService serializationService; @@ -60,7 +62,8 @@ public class InboundDecoder extends ByteToMessageDecoder { * * @param serializationService Serialization service. */ - public InboundDecoder(PerSessionSerializationService serializationService) { + public InboundDecoder(MessageFormat messageFormat, PerSessionSerializationService serializationService) { + this.messageFormat = messageFormat; this.serializationService = serializationService; } @@ -73,7 +76,7 @@ public class InboundDecoder extends ByteToMessageDecoder { MessageReader reader = readerAttr.get(); if (reader == null) { - reader = new DirectMessageReader(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION); + reader = messageFormat.reader(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION); readerAttr.set(reader); } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java index f842190a3f5..82330c28845 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java @@ -32,15 +32,15 @@ import java.util.List; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.NetworkMessagesFactory; import org.apache.ignite.internal.network.OutNetworkObject; -import org.apache.ignite.internal.network.direct.DirectMessageWriter; import org.apache.ignite.internal.network.message.ClassDescriptorListMessage; import org.apache.ignite.internal.network.message.ClassDescriptorMessage; +import org.apache.ignite.internal.network.serialization.MessageFormat; import org.apache.ignite.internal.network.serialization.MessageSerializer; import org.apache.ignite.internal.network.serialization.MessageWriter; import org.apache.ignite.internal.network.serialization.PerSessionSerializationService; /** - * An encoder for the outbound messages that uses {@link DirectMessageWriter}. + * An encoder for the outbound messages that uses the provided {@link MessageFormat}. */ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> { /** Handler name. */ @@ -53,6 +53,8 @@ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> { /** Message writer channel attribute key. */ private static final AttributeKey<MessageWriter> WRITER_KEY = AttributeKey.valueOf("WRITER"); + private final MessageFormat messageFormat; + /** Serialization registry. */ private final PerSessionSerializationService serializationService; @@ -61,7 +63,8 @@ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> { * * @param serializationService Serialization service. */ - public OutboundEncoder(PerSessionSerializationService serializationService) { + public OutboundEncoder(MessageFormat messageFormat, PerSessionSerializationService serializationService) { + this.messageFormat = messageFormat; this.serializationService = serializationService; } @@ -71,7 +74,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> { MessageWriter writer = writerAttr.get(); if (writer == null) { - writer = new DirectMessageWriter(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION); + writer = messageFormat.writer(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION); writerAttr.set(writer); } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java index 7d5faac9dfc..c905e8ac6e3 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java @@ -22,9 +22,11 @@ import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.stream.ChunkedWriteHandler; import java.util.function.Consumer; +import org.apache.ignite.internal.network.NaiveMessageFormat; import org.apache.ignite.internal.network.NetworkMessagesFactory; import org.apache.ignite.internal.network.handshake.HandshakeManager; import org.apache.ignite.internal.network.recovery.RecoveryDescriptor; +import org.apache.ignite.internal.network.serialization.MessageFormat; import org.apache.ignite.internal.network.serialization.PerSessionSerializationService; /** Pipeline utils. */ @@ -58,13 +60,15 @@ public class PipelineUtils { */ public static void setup(ChannelPipeline pipeline, PerSessionSerializationService serializationService, HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) { + MessageFormat messageFormat = new NaiveMessageFormat(); + // Consolidate flushes to bigger ones (improves throughput with smaller messages at the price of the latency). pipeline.addLast(new FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true)); - pipeline.addLast(InboundDecoder.NAME, new InboundDecoder(serializationService)); + pipeline.addLast(InboundDecoder.NAME, new InboundDecoder(messageFormat, serializationService)); pipeline.addLast(HandshakeHandler.NAME, new HandshakeHandler(handshakeManager, messageListener, serializationService)); pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new ChunkedWriteHandler()); - pipeline.addLast(OutboundEncoder.NAME, new OutboundEncoder(serializationService)); + pipeline.addLast(OutboundEncoder.NAME, new OutboundEncoder(messageFormat, serializationService)); pipeline.addLast(IoExceptionSuppressingHandler.NAME, new IoExceptionSuppressingHandler()); } diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java index 7a390bc22ed..3f87efd70c8 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java @@ -36,14 +36,16 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.LongStream; import org.apache.ignite.internal.network.AllTypesMessageGenerator; +import org.apache.ignite.internal.network.NaiveMessageFormat; import org.apache.ignite.internal.network.NetworkMessage; -import org.apache.ignite.internal.network.direct.DirectMessageWriter; import org.apache.ignite.internal.network.messages.AllTypesMessage; import org.apache.ignite.internal.network.messages.NestedMessageMessage; import org.apache.ignite.internal.network.messages.TestMessage; import org.apache.ignite.internal.network.messages.TestMessagesFactory; +import org.apache.ignite.internal.network.serialization.MessageFormat; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.network.serialization.MessageSerializer; +import org.apache.ignite.internal.network.serialization.MessageWriter; import org.apache.ignite.internal.network.serialization.PerSessionSerializationService; import org.apache.ignite.internal.network.serialization.SerializationService; import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext; @@ -63,6 +65,8 @@ public class InboundDecoderTest extends BaseIgniteAbstractTest { /** Registry. */ private final MessageSerializationRegistry registry = defaultSerializationRegistry(); + private final MessageFormat messageFormat = new NaiveMessageFormat(); + /** * Tests that an {@link InboundDecoder} can successfully read a message with all types supported by direct marshalling. * @@ -99,9 +103,9 @@ public class InboundDecoderTest extends BaseIgniteAbstractTest { private <T extends NetworkMessage> T sendAndReceive(T msg) { var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class)); var perSessionSerializationService = new PerSessionSerializationService(serializationService); - var channel = new EmbeddedChannel(new InboundDecoder(perSessionSerializationService)); + var channel = new EmbeddedChannel(new InboundDecoder(messageFormat, perSessionSerializationService)); - var writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION); + MessageWriter writer = messageFormat.writer(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION); MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType()); @@ -139,7 +143,7 @@ public class InboundDecoderTest extends BaseIgniteAbstractTest { public void testPartialHeader() throws Exception { var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class)); var perSessionSerializationService = new PerSessionSerializationService(serializationService); - var channel = new EmbeddedChannel(new InboundDecoder(perSessionSerializationService)); + var channel = new EmbeddedChannel(new InboundDecoder(messageFormat, perSessionSerializationService)); ByteBuf buffer = allocator.buffer(); @@ -174,11 +178,11 @@ public class InboundDecoderTest extends BaseIgniteAbstractTest { var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class)); var perSessionSerializationService = new PerSessionSerializationService(serializationService); - final var decoder = new InboundDecoder(perSessionSerializationService); + final var decoder = new InboundDecoder(messageFormat, perSessionSerializationService); final var list = new ArrayList<>(); - var writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION); + MessageWriter writer = messageFormat.writer(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION); var msg = new TestMessagesFactory().testMessage().msg("abcdefghijklmn").build(); diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java index 3e21902f078..3e6ee0bd3a9 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java @@ -43,6 +43,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.ignite.internal.network.NaiveMessageFormat; import org.apache.ignite.internal.network.OutNetworkObject; import org.apache.ignite.internal.network.message.ClassDescriptorMessage; import org.apache.ignite.internal.network.messages.MessageWithMarshallable; @@ -68,6 +69,8 @@ public class MarshallableTest extends BaseIgniteAbstractTest { private final TestMessagesFactory msgFactory = new TestMessagesFactory(); + private final MessageFormat messageFormat = new NaiveMessageFormat(); + /** * Tests that marshallable object can be serialized along with its descriptor. */ @@ -95,7 +98,7 @@ public class MarshallableTest extends BaseIgniteAbstractTest { var channel = new EmbeddedChannel( new ChunkedWriteHandler(), - new OutboundEncoder(serializers.perSessionSerializationService) + new OutboundEncoder(messageFormat, serializers.perSessionSerializationService) ); List<ClassDescriptorMessage> classDescriptorsMessages = PerSessionSerializationService.createClassDescriptorsMessages( @@ -134,7 +137,7 @@ public class MarshallableTest extends BaseIgniteAbstractTest { PerSessionSerializationService perSessionSerializationService = serializers.perSessionSerializationService; ClassDescriptor descriptor = serializers.descriptor; - final var decoder = new InboundDecoder(perSessionSerializationService); + final var decoder = new InboundDecoder(messageFormat, perSessionSerializationService); int size = outBuffer.position();