This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d7df154b702 IGNITE-28294 Fixed nested Java collections serialization
(#12917)
d7df154b702 is described below
commit d7df154b70229853b7ed8fa38303827743024355
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Wed Mar 25 11:52:48 2026 +0300
IGNITE-28294 Fixed nested Java collections serialization (#12917)
---
.../internal/direct/DirectMessageReader.java | 4 +-
.../internal/direct/DirectMessageWriter.java | 4 +-
.../direct/stream/DirectByteBufferStream.java | 69 ++++++-----
.../extensions/communication/MessageReader.java | 10 +-
.../extensions/communication/MessageWriter.java | 10 +-
.../direct/DirectMarshallingMessagesTest.java | 138 +++++++++++++++++++++
.../direct/TestNestedContainersMessage.java | 52 ++++++++
.../AbstractMessageSerializationTest.java | 8 +-
.../testsuites/IgniteMarshallerSelfTestSuite.java | 2 +
9 files changed, 251 insertions(+), 46 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index cce73aee5b1..d32d179ae55 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -442,14 +442,14 @@ public class DirectMessageReader implements MessageReader
{
}
/** {@inheritDoc} */
- @Override public void beforeInnerMessageRead() {
+ @Override public void beforeNestedRead() {
state.forward();
state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
- @Override public void afterInnerMessageRead(boolean finished) {
+ @Override public void afterNestedRead(boolean finished) {
state.backward(finished);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index db35c93e165..c88e2f5cb26 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -420,14 +420,14 @@ public class DirectMessageWriter implements MessageWriter
{
}
/** {@inheritDoc} */
- @Override public void beforeInnerMessageWrite() {
+ @Override public void beforeNestedWrite() {
state.forward();
state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
- @Override public void afterInnerMessageWrite(boolean finished) {
+ @Override public void afterNestedWrite(boolean finished) {
state.backward(finished);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
index 6890200d5f7..053732880b6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
import java.util.UUID;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.communication.CompressedMessage;
@@ -921,16 +923,8 @@ public class DirectByteBufferStream {
*/
public void writeMessage(Message msg, MessageWriter writer) {
if (msg != null) {
- if (buf.hasRemaining()) {
- try {
- writer.beforeInnerMessageWrite();
-
- lastFinished =
msgFactory.serializer(msg.directType()).writeTo(msg, writer);
- }
- finally {
- writer.afterInnerMessageWrite(lastFinished);
- }
- }
+ if (buf.hasRemaining())
+ nestedWrite(writer, () ->
msgFactory.serializer(msg.directType()).writeTo(msg, writer));
else
lastFinished = false;
}
@@ -1574,12 +1568,12 @@ public class DirectByteBufferStream {
if (msg != null) {
try {
- reader.beforeInnerMessageRead();
+ reader.beforeNestedRead();
lastFinished =
msgFactory.serializer(msg.directType()).readFrom(msg, reader);
}
finally {
- reader.afterInnerMessageRead(lastFinished);
+ reader.afterNestedRead(lastFinished);
}
}
else
@@ -2117,31 +2111,22 @@ public class DirectByteBufferStream {
break;
case MAP:
- writeMap((Map<K, V>)val, (MessageMapType)type, writer);
+ nestedWrite(writer, () -> writer.writeMap((Map<K, V>)val,
(MessageMapType)type));
break;
case COLLECTION:
- writeCollection((Collection<V>)val,
(MessageCollectionType)type, writer);
+ nestedWrite(writer, () ->
writer.writeCollection((Collection<V>)val, (MessageCollectionType)type));
break;
case ARRAY:
- writeObjectArray((V[])val, (MessageArrayType)type, writer);
+ nestedWrite(writer, () -> writer.writeObjectArray((V[])val,
(MessageArrayType)type));
break;
case MSG:
- try {
- if (val != null)
- writer.beforeInnerMessageWrite();
-
- writeMessage((Message)val, writer);
- }
- finally {
- if (val != null)
- writer.afterInnerMessageWrite(lastFinished);
- }
+ writeMessage((Message)val, writer);
break;
@@ -2150,6 +2135,18 @@ public class DirectByteBufferStream {
}
}
+ /** Performs a nested write with proper writer state enter/exit handling.
*/
+ private void nestedWrite(MessageWriter writer, BooleanSupplier s) {
+ try {
+ writer.beforeNestedWrite();
+
+ lastFinished = s.getAsBoolean();
+ }
+ finally {
+ writer.afterNestedWrite(lastFinished);
+ }
+ }
+
/**
* @param type Type.
* @param reader Reader.
@@ -2230,13 +2227,13 @@ public class DirectByteBufferStream {
return readGridLongList();
case MAP:
- return readMap((MessageMapType)type, reader);
+ return nestedRead(reader, () ->
reader.readMap((MessageMapType)type));
case COLLECTION:
- return readCollection((MessageCollectionType)type, reader);
+ return nestedRead(reader, () ->
reader.readCollection((MessageCollectionType)type));
case ARRAY:
- return readObjectArray((MessageArrayType)type, reader);
+ return nestedRead(reader, () ->
reader.readObjectArray((MessageArrayType)type));
case MSG:
return readMessage(reader);
@@ -2246,6 +2243,22 @@ public class DirectByteBufferStream {
}
}
+ /** Performs a nested read with proper reader state management. */
+ private <R> R nestedRead(MessageReader reader, Supplier<R> s) {
+ try {
+ reader.beforeNestedRead();
+
+ R r = s.get();
+
+ lastFinished = reader.isLastRead();
+
+ return r;
+ }
+ finally {
+ reader.afterNestedRead(lastFinished);
+ }
+ }
+
/** */
private void writeUuidRaw(UUID val) {
lastFinished = buf.remaining() >= 16;
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index 48cf3355ce6..f2190a1e86a 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -297,16 +297,16 @@ public interface MessageReader {
public void decrementState();
/**
- * Callback called before inner message is read.
+ * Callback called before nested object is read.
*/
- public void beforeInnerMessageRead();
+ public void beforeNestedRead();
/**
- * Callback called after inner message is read.
+ * Callback called after nested object is read.
*
- * @param finished Whether message was fully read.
+ * @param finished Whether object was fully read.
*/
- public void afterInnerMessageRead(boolean finished);
+ public void afterNestedRead(boolean finished);
/**
* Resets this reader.
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index a189a195b8e..c29e3ac093d 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -356,16 +356,16 @@ public interface MessageWriter {
public void decrementState();
/**
- * Callback called before inner message is written.
+ * Callback called before nested object is written.
*/
- public void beforeInnerMessageWrite();
+ public void beforeNestedWrite();
/**
- * Callback called after inner message is written.
+ * Callback called after nested object is written.
*
- * @param finished Whether message was fully written.
+ * @param finished Whether object was fully written.
*/
- public void afterInnerMessageWrite(boolean finished);
+ public void afterNestedWrite(boolean finished);
/**
* Resets this writer.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
new file mode 100644
index 00000000000..cd7fccd16fd
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.marshaller.Marshallers.jdk;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Messages marshalling test.
+ */
+public class DirectMarshallingMessagesTest extends GridCommonAbstractTest {
+ /**
+ * Size of chunk for marshalling.
+ * <p>
+ * Should be small to ensure message is written/read in parts.
+ */
+ private static final int CHUNK_SIZE = 16;
+
+ /** Message factory. */
+ private final MessageFactory msgFactory =
+ new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {
+ new GridIoMessageFactory(jdk(), U.gridClassLoader()),
+ factory -> factory.register(
+ TestNestedContainersMessage.TYPE,
+ TestNestedContainersMessage::new,
+ new TestNestedContainersMessageSerializer()
+ )
+ });
+
+ /** */
+ @Test
+ public void testNestedContainers() {
+ TestNestedContainersMessage msg = new TestNestedContainersMessage();
+
+ msg.nestedMap = Map.of(
+ 1, Map.of(1, 2L),
+ 2, Map.of(1, 2L)
+ );
+
+ msg.nestedCollection = Map.of(
+ 1, Arrays.asList(1),
+ 2, Arrays.asList(2)
+ );
+
+ msg.nestedArr = Map.of(
+ 1, new String[]{"AAA", "AAA"},
+ 2, new String[]{"BBB", "BBB"}
+ );
+
+ TestNestedContainersMessage resMsg = doMarshalUnmarshalChunked(msg);
+
+ assertEquals(msg.nestedMap, resMsg.nestedMap);
+ assertEquals(msg.nestedCollection, resMsg.nestedCollection);
+ assertArrayEquals(msg.nestedArr.get(1), resMsg.nestedArr.get(1));
+ assertArrayEquals(msg.nestedArr.get(2), resMsg.nestedArr.get(2));
+ }
+
+ /**
+ * @param srcMsg Message to marshal.
+ * @param <T> Message type.
+ * @return Unmarshalled message.
+ */
+ private <T extends Message> T doMarshalUnmarshalChunked(T srcMsg) {
+ ByteBuffer buf = ByteBuffer.allocate(256);
+
+ DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+
+ boolean fullyWritten = false;
+
+ while (!fullyWritten) {
+ ByteBuffer chunk = ByteBuffer.allocate(CHUNK_SIZE);
+
+ writer.setBuffer(chunk);
+
+ fullyWritten = writer.writeMessage(srcMsg, false);
+
+ chunk.flip();
+
+ buf.put(chunk);
+ }
+
+ byte[] bytes = new byte[buf.position()];
+
+ buf.flip();
+ buf.get(bytes);
+
+ DirectMessageReader reader = new DirectMessageReader(msgFactory, null);
+
+ Message resMsg = null;
+
+ int pos = 0;
+
+ while (resMsg == null) {
+ int len = Math.min(CHUNK_SIZE, bytes.length - pos);
+
+ ByteBuffer chunk = ByteBuffer.allocate(len);
+
+ chunk.put(bytes, pos, len);
+
+ chunk.flip();
+
+ reader.setBuffer(chunk);
+
+ resMsg = reader.readMessage(false);
+
+ pos += chunk.position();
+ }
+
+ return (T)resMsg;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/direct/TestNestedContainersMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/direct/TestNestedContainersMessage.java
new file mode 100644
index 00000000000..eecdadd5cd6
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/direct/TestNestedContainersMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/** */
+class TestNestedContainersMessage implements Message {
+ /** */
+ public static final short TYPE = Short.MAX_VALUE;
+
+ /** */
+ @Order(0)
+ Map<Integer, Map<Integer, Long>> nestedMap;
+
+ /** */
+ @Order(1)
+ Map<Integer, List<Integer>> nestedCollection;
+
+ /** */
+ @Order(2)
+ Map<Integer, String[]> nestedArr;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public TestNestedContainersMessage() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
index 1104b04e217..e003670f18f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
@@ -333,10 +333,10 @@ public abstract class AbstractMessageSerializationTest {
}
/** {@inheritDoc} */
- @Override public void beforeInnerMessageWrite() {}
+ @Override public void beforeNestedWrite() {}
/** {@inheritDoc} */
- @Override public void afterInnerMessageWrite(boolean finished) {}
+ @Override public void afterNestedWrite(boolean finished) {}
/** {@inheritDoc} */
@Override public void reset() {
@@ -596,10 +596,10 @@ public abstract class AbstractMessageSerializationTest {
}
/** {@inheritDoc} */
- @Override public void beforeInnerMessageRead() {}
+ @Override public void beforeNestedRead() {}
/** {@inheritDoc} */
- @Override public void afterInnerMessageRead(boolean finished) {}
+ @Override public void afterNestedRead(boolean finished) {}
/** {@inheritDoc} */
@Override public void reset() {
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
index aa268afe0fa..3872d67eec6 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testsuites;
+import org.apache.ignite.internal.direct.DirectMarshallingMessagesTest;
import
org.apache.ignite.internal.direct.stream.DirectByteBufferStreamImplByteOrderSelfTest;
import
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerEnumSelfTest;
import
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerPooledSelfTest;
@@ -50,6 +51,7 @@ import org.junit.runners.Suite;
GridHandleTableSelfTest.class,
OptimizedMarshallerPooledSelfTest.class,
MarshallerEnumDeadlockMultiJvmTest.class,
+ DirectMarshallingMessagesTest.class,
ObjectInputStreamFilteringTest.class,
})
public class IgniteMarshallerSelfTestSuite {