This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d6212ea6b2 IGNITE-23631 Optimize NetworkMessageChunkedInput 
constructor (#4689)
d6212ea6b2 is described below

commit d6212ea6b213c501c4ab796d4e5feded33fdd885
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 8 14:25:36 2024 +0400

    IGNITE-23631 Optimize NetworkMessageChunkedInput constructor (#4689)
---
 .../internal/network/netty/OutboundEncoder.java    | 36 ++++++++++++----------
 1 file changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index bf63ed9182..f3861cbd28 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -23,8 +23,8 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
@@ -57,7 +57,6 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
         this.serializationService = serializationService;
     }
 
-    /** {@inheritDoc} */
     @Override
     protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg, 
List<Object> out) throws Exception {
         out.add(new NetworkMessageChunkedInput(msg, serializationService));
@@ -98,17 +97,26 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
             this.serializationService = serializationService;
             this.msg = outObject.networkMessage();
 
-            List<ClassDescriptorMessage> outDescriptors = 
outObject.descriptors().stream()
-                    .filter(classDescriptorMessage -> 
!serializationService.isDescriptorSent(classDescriptorMessage.descriptorId()))
-                    .collect(Collectors.toList());
+            List<ClassDescriptorMessage> outDescriptors = null;
+            List<ClassDescriptorMessage> outObjectDescriptors = 
outObject.descriptors();
+            //noinspection ForLoopReplaceableByForEach
+            for (int i = 0, descriptorsedSize = outObjectDescriptors.size(); i 
< descriptorsedSize; i++) {
+                ClassDescriptorMessage classDescriptorMessage = 
outObjectDescriptors.get(i);
+                if 
(!serializationService.isDescriptorSent(classDescriptorMessage.descriptorId())) 
{
+                    if (outDescriptors == null) {
+                        outDescriptors = new 
ArrayList<>(outObject.descriptors().size());
+                    }
+                    outDescriptors.add(classDescriptorMessage);
+                }
+            }
 
-            if (!outDescriptors.isEmpty()) {
+            if (outDescriptors != null) {
                 this.descriptors = 
MSG_FACTORY.classDescriptorListMessage().messages(outDescriptors).build();
                 short groupType = this.descriptors.groupType();
                 short messageType = this.descriptors.messageType();
                 descriptorSerializer = 
serializationService.createMessageSerializer(groupType, messageType);
             } else {
-                descriptors = null;
+                this.descriptors = null;
                 descriptorSerializer = null;
                 descriptorsFinished = true;
             }
@@ -117,26 +125,22 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
             this.writer = new 
DirectMessageWriter(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
         }
 
-        /** {@inheritDoc} */
         @Override
-        public boolean isEndOfInput() throws Exception {
+        public boolean isEndOfInput() {
             return finished;
         }
 
-        /** {@inheritDoc} */
         @Override
-        public void close() throws Exception {
-
+        public void close() {
+            // No-op.
         }
 
-        /** {@inheritDoc} */
         @Deprecated
         @Override
-        public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+        public ByteBuf readChunk(ChannelHandlerContext ctx) {
             return readChunk(ctx.alloc());
         }
 
-        /** {@inheritDoc} */
         @Override
         public ByteBuf readChunk(ByteBufAllocator allocator) {
             ByteBuf buffer = allocator.ioBuffer(IO_BUFFER_CAPACITY);
@@ -170,14 +174,12 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
             return buffer;
         }
 
-        /** {@inheritDoc} */
         @Override
         public long length() {
             // Return negative values, because object's size is unknown.
             return -1;
         }
 
-        /** {@inheritDoc} */
         @Override
         public long progress() {
             // Not really needed, as there won't be listeners for the write 
operation's progress.

Reply via email to