This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d7df154b702 IGNITE-28294 Fixed nested Java collections serialization 
(#12917)
d7df154b702 is described below

commit d7df154b70229853b7ed8fa38303827743024355
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Wed Mar 25 11:52:48 2026 +0300

    IGNITE-28294 Fixed nested Java collections serialization (#12917)
---
 .../internal/direct/DirectMessageReader.java       |   4 +-
 .../internal/direct/DirectMessageWriter.java       |   4 +-
 .../direct/stream/DirectByteBufferStream.java      |  69 ++++++-----
 .../extensions/communication/MessageReader.java    |  10 +-
 .../extensions/communication/MessageWriter.java    |  10 +-
 .../direct/DirectMarshallingMessagesTest.java      | 138 +++++++++++++++++++++
 .../direct/TestNestedContainersMessage.java        |  52 ++++++++
 .../AbstractMessageSerializationTest.java          |   8 +-
 .../testsuites/IgniteMarshallerSelfTestSuite.java  |   2 +
 9 files changed, 251 insertions(+), 46 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index cce73aee5b1..d32d179ae55 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -442,14 +442,14 @@ public class DirectMessageReader implements MessageReader 
{
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeInnerMessageRead() {
+    @Override public void beforeNestedRead() {
         state.forward();
 
         state.item().stream.setBuffer(buf);
     }
 
     /** {@inheritDoc} */
-    @Override public void afterInnerMessageRead(boolean finished) {
+    @Override public void afterNestedRead(boolean finished) {
         state.backward(finished);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index db35c93e165..c88e2f5cb26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -420,14 +420,14 @@ public class DirectMessageWriter implements MessageWriter 
{
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeInnerMessageWrite() {
+    @Override public void beforeNestedWrite() {
         state.forward();
 
         state.item().stream.setBuffer(buf);
     }
 
     /** {@inheritDoc} */
-    @Override public void afterInnerMessageWrite(boolean finished) {
+    @Override public void afterNestedWrite(boolean finished) {
         state.backward(finished);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
index 6890200d5f7..053732880b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.RandomAccess;
 import java.util.UUID;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.managers.communication.CompressedMessage;
@@ -921,16 +923,8 @@ public class DirectByteBufferStream {
      */
     public void writeMessage(Message msg, MessageWriter writer) {
         if (msg != null) {
-            if (buf.hasRemaining()) {
-                try {
-                    writer.beforeInnerMessageWrite();
-
-                    lastFinished = 
msgFactory.serializer(msg.directType()).writeTo(msg, writer);
-                }
-                finally {
-                    writer.afterInnerMessageWrite(lastFinished);
-                }
-            }
+            if (buf.hasRemaining())
+                nestedWrite(writer, () -> 
msgFactory.serializer(msg.directType()).writeTo(msg, writer));
             else
                 lastFinished = false;
         }
@@ -1574,12 +1568,12 @@ public class DirectByteBufferStream {
 
         if (msg != null) {
             try {
-                reader.beforeInnerMessageRead();
+                reader.beforeNestedRead();
 
                 lastFinished = 
msgFactory.serializer(msg.directType()).readFrom(msg, reader);
             }
             finally {
-                reader.afterInnerMessageRead(lastFinished);
+                reader.afterNestedRead(lastFinished);
             }
         }
         else
@@ -2117,31 +2111,22 @@ public class DirectByteBufferStream {
                 break;
 
             case MAP:
-                writeMap((Map<K, V>)val, (MessageMapType)type, writer);
+                nestedWrite(writer, () -> writer.writeMap((Map<K, V>)val, 
(MessageMapType)type));
 
                 break;
 
             case COLLECTION:
-                writeCollection((Collection<V>)val, 
(MessageCollectionType)type, writer);
+                nestedWrite(writer, () -> 
writer.writeCollection((Collection<V>)val, (MessageCollectionType)type));
 
                 break;
 
             case ARRAY:
-                writeObjectArray((V[])val, (MessageArrayType)type, writer);
+                nestedWrite(writer, () -> writer.writeObjectArray((V[])val, 
(MessageArrayType)type));
 
                 break;
 
             case MSG:
-                try {
-                    if (val != null)
-                        writer.beforeInnerMessageWrite();
-
-                    writeMessage((Message)val, writer);
-                }
-                finally {
-                    if (val != null)
-                        writer.afterInnerMessageWrite(lastFinished);
-                }
+                writeMessage((Message)val, writer);
 
                 break;
 
@@ -2150,6 +2135,18 @@ public class DirectByteBufferStream {
         }
     }
 
+    /** Performs a nested write with proper writer state enter/exit handling. 
*/
+    private void nestedWrite(MessageWriter writer, BooleanSupplier s) {
+        try {
+            writer.beforeNestedWrite();
+
+            lastFinished = s.getAsBoolean();
+        }
+        finally {
+            writer.afterNestedWrite(lastFinished);
+        }
+    }
+
     /**
      * @param type Type.
      * @param reader Reader.
@@ -2230,13 +2227,13 @@ public class DirectByteBufferStream {
                 return readGridLongList();
 
             case MAP:
-                return readMap((MessageMapType)type, reader);
+                return nestedRead(reader, () -> 
reader.readMap((MessageMapType)type));
 
             case COLLECTION:
-                return readCollection((MessageCollectionType)type, reader);
+                return nestedRead(reader, () -> 
reader.readCollection((MessageCollectionType)type));
 
             case ARRAY:
-                return readObjectArray((MessageArrayType)type, reader);
+                return nestedRead(reader, () -> 
reader.readObjectArray((MessageArrayType)type));
 
             case MSG:
                 return readMessage(reader);
@@ -2246,6 +2243,22 @@ public class DirectByteBufferStream {
         }
     }
 
+    /** Performs a nested read with proper reader state management. */
+    private <R> R nestedRead(MessageReader reader, Supplier<R> s) {
+        try {
+            reader.beforeNestedRead();
+
+            R r = s.get();
+
+            lastFinished = reader.isLastRead();
+
+            return r;
+        }
+        finally {
+            reader.afterNestedRead(lastFinished);
+        }
+    }
+
     /** */
     private void writeUuidRaw(UUID val) {
         lastFinished = buf.remaining() >= 16;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index 48cf3355ce6..f2190a1e86a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -297,16 +297,16 @@ public interface MessageReader {
     public void decrementState();
 
     /**
-     * Callback called before inner message is read.
+     * Callback called before nested object is read.
      */
-    public void beforeInnerMessageRead();
+    public void beforeNestedRead();
 
     /**
-     * Callback called after inner message is read.
+     * Callback called after nested object is read.
      *
-     * @param finished Whether message was fully read.
+     * @param finished Whether object was fully read.
      */
-    public void afterInnerMessageRead(boolean finished);
+    public void afterNestedRead(boolean finished);
 
     /**
      * Resets this reader.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index a189a195b8e..c29e3ac093d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -356,16 +356,16 @@ public interface MessageWriter {
     public void decrementState();
 
     /**
-     * Callback called before inner message is written.
+     * Callback called before nested object is written.
      */
-    public void beforeInnerMessageWrite();
+    public void beforeNestedWrite();
 
     /**
-     * Callback called after inner message is written.
+     * Callback called after nested object is written.
      *
-     * @param finished Whether message was fully written.
+     * @param finished Whether object was fully written.
      */
-    public void afterInnerMessageWrite(boolean finished);
+    public void afterNestedWrite(boolean finished);
 
     /**
      * Resets this writer.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
new file mode 100644
index 00000000000..cd7fccd16fd
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.marshaller.Marshallers.jdk;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Messages marshalling test.
+ */
+public class DirectMarshallingMessagesTest extends GridCommonAbstractTest {
+    /**
+     * Size of chunk for marshalling.
+     * <p>
+     * Should be small to ensure message is written/read in parts.
+     */
+    private static final int CHUNK_SIZE = 16;
+
+    /** Message factory. */
+    private final MessageFactory msgFactory =
+        new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {
+            new GridIoMessageFactory(jdk(), U.gridClassLoader()),
+            factory -> factory.register(
+                TestNestedContainersMessage.TYPE,
+                TestNestedContainersMessage::new,
+                new TestNestedContainersMessageSerializer()
+            )
+        });
+
+    /** */
+    @Test
+    public void testNestedContainers() {
+        TestNestedContainersMessage msg = new TestNestedContainersMessage();
+
+        msg.nestedMap = Map.of(
+            1, Map.of(1, 2L),
+            2, Map.of(1, 2L)
+        );
+
+        msg.nestedCollection = Map.of(
+            1, Arrays.asList(1),
+            2, Arrays.asList(2)
+        );
+
+        msg.nestedArr = Map.of(
+            1, new String[]{"AAA", "AAA"},
+            2, new String[]{"BBB", "BBB"}
+        );
+
+        TestNestedContainersMessage resMsg = doMarshalUnmarshalChunked(msg);
+
+        assertEquals(msg.nestedMap, resMsg.nestedMap);
+        assertEquals(msg.nestedCollection, resMsg.nestedCollection);
+        assertArrayEquals(msg.nestedArr.get(1), resMsg.nestedArr.get(1));
+        assertArrayEquals(msg.nestedArr.get(2), resMsg.nestedArr.get(2));
+    }
+
+    /**
+     * @param srcMsg Message to marshal.
+     * @param <T> Message type.
+     * @return Unmarshalled message.
+     */
+    private <T extends Message> T doMarshalUnmarshalChunked(T srcMsg) {
+        ByteBuffer buf = ByteBuffer.allocate(256);
+
+        DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+
+        boolean fullyWritten = false;
+
+        while (!fullyWritten) {
+            ByteBuffer chunk = ByteBuffer.allocate(CHUNK_SIZE);
+
+            writer.setBuffer(chunk);
+
+            fullyWritten = writer.writeMessage(srcMsg, false);
+
+            chunk.flip();
+
+            buf.put(chunk);
+        }
+
+        byte[] bytes = new byte[buf.position()];
+
+        buf.flip();
+        buf.get(bytes);
+
+        DirectMessageReader reader = new DirectMessageReader(msgFactory, null);
+
+        Message resMsg = null;
+
+        int pos = 0;
+
+        while (resMsg == null) {
+            int len = Math.min(CHUNK_SIZE, bytes.length - pos);
+
+            ByteBuffer chunk = ByteBuffer.allocate(len);
+
+            chunk.put(bytes, pos, len);
+
+            chunk.flip();
+
+            reader.setBuffer(chunk);
+
+            resMsg = reader.readMessage(false);
+
+            pos += chunk.position();
+        }
+
+        return (T)resMsg;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/TestNestedContainersMessage.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/TestNestedContainersMessage.java
new file mode 100644
index 00000000000..eecdadd5cd6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/TestNestedContainersMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/** */
+class TestNestedContainersMessage implements Message {
+    /** */
+    public static final short TYPE = Short.MAX_VALUE;
+
+    /** */
+    @Order(0)
+    Map<Integer, Map<Integer, Long>> nestedMap;
+
+    /** */
+    @Order(1)
+    Map<Integer, List<Integer>> nestedCollection;
+
+    /** */
+    @Order(2)
+    Map<Integer, String[]> nestedArr;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public TestNestedContainersMessage() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
index 1104b04e217..e003670f18f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
@@ -333,10 +333,10 @@ public abstract class AbstractMessageSerializationTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void beforeInnerMessageWrite() {}
+        @Override public void beforeNestedWrite() {}
 
         /** {@inheritDoc} */
-        @Override public void afterInnerMessageWrite(boolean finished) {}
+        @Override public void afterNestedWrite(boolean finished) {}
 
         /** {@inheritDoc} */
         @Override public void reset() {
@@ -596,10 +596,10 @@ public abstract class AbstractMessageSerializationTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void beforeInnerMessageRead() {}
+        @Override public void beforeNestedRead() {}
 
         /** {@inheritDoc} */
-        @Override public void afterInnerMessageRead(boolean finished) {}
+        @Override public void afterNestedRead(boolean finished) {}
 
         /** {@inheritDoc} */
         @Override public void reset() {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
index aa268afe0fa..3872d67eec6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testsuites;
 
+import org.apache.ignite.internal.direct.DirectMarshallingMessagesTest;
 import 
org.apache.ignite.internal.direct.stream.DirectByteBufferStreamImplByteOrderSelfTest;
 import 
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerEnumSelfTest;
 import 
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerPooledSelfTest;
@@ -50,6 +51,7 @@ import org.junit.runners.Suite;
     GridHandleTableSelfTest.class,
     OptimizedMarshallerPooledSelfTest.class,
     MarshallerEnumDeadlockMultiJvmTest.class,
+    DirectMarshallingMessagesTest.class,
     ObjectInputStreamFilteringTest.class,
 })
 public class IgniteMarshallerSelfTestSuite {

Reply via email to