This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d6212ea6b2 IGNITE-23631 Optimize NetworkMessageChunkedInput
constructor (#4689)
d6212ea6b2 is described below
commit d6212ea6b213c501c4ab796d4e5feded33fdd885
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 8 14:25:36 2024 +0400
IGNITE-23631 Optimize NetworkMessageChunkedInput constructor (#4689)
---
.../internal/network/netty/OutboundEncoder.java | 36 ++++++++++++----------
1 file changed, 19 insertions(+), 17 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index bf63ed9182..f3861cbd28 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -23,8 +23,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.stream.ChunkedInput;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
@@ -57,7 +57,6 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
this.serializationService = serializationService;
}
- /** {@inheritDoc} */
@Override
protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg,
List<Object> out) throws Exception {
out.add(new NetworkMessageChunkedInput(msg, serializationService));
@@ -98,17 +97,26 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
this.serializationService = serializationService;
this.msg = outObject.networkMessage();
- List<ClassDescriptorMessage> outDescriptors =
outObject.descriptors().stream()
- .filter(classDescriptorMessage ->
!serializationService.isDescriptorSent(classDescriptorMessage.descriptorId()))
- .collect(Collectors.toList());
+ List<ClassDescriptorMessage> outDescriptors = null;
+ List<ClassDescriptorMessage> outObjectDescriptors =
outObject.descriptors();
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0, descriptorsedSize = outObjectDescriptors.size(); i
< descriptorsedSize; i++) {
+ ClassDescriptorMessage classDescriptorMessage =
outObjectDescriptors.get(i);
+ if
(!serializationService.isDescriptorSent(classDescriptorMessage.descriptorId()))
{
+ if (outDescriptors == null) {
+ outDescriptors = new
ArrayList<>(outObject.descriptors().size());
+ }
+ outDescriptors.add(classDescriptorMessage);
+ }
+ }
- if (!outDescriptors.isEmpty()) {
+ if (outDescriptors != null) {
this.descriptors =
MSG_FACTORY.classDescriptorListMessage().messages(outDescriptors).build();
short groupType = this.descriptors.groupType();
short messageType = this.descriptors.messageType();
descriptorSerializer =
serializationService.createMessageSerializer(groupType, messageType);
} else {
- descriptors = null;
+ this.descriptors = null;
descriptorSerializer = null;
descriptorsFinished = true;
}
@@ -117,26 +125,22 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
this.writer = new
DirectMessageWriter(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
}
- /** {@inheritDoc} */
@Override
- public boolean isEndOfInput() throws Exception {
+ public boolean isEndOfInput() {
return finished;
}
- /** {@inheritDoc} */
@Override
- public void close() throws Exception {
-
+ public void close() {
+ // No-op.
}
- /** {@inheritDoc} */
@Deprecated
@Override
- public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+ public ByteBuf readChunk(ChannelHandlerContext ctx) {
return readChunk(ctx.alloc());
}
- /** {@inheritDoc} */
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) {
ByteBuf buffer = allocator.ioBuffer(IO_BUFFER_CAPACITY);
@@ -170,14 +174,12 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
return buffer;
}
- /** {@inheritDoc} */
@Override
public long length() {
// Return negative values, because object's size is unknown.
return -1;
}
- /** {@inheritDoc} */
@Override
public long progress() {
// Not really needed, as there won't be listeners for the write
operation's progress.