This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new dcb16bc6977 KAFKA-18199; Fix size calculation for nullable tagged 
structs (#18127)
dcb16bc6977 is described below

commit dcb16bc6977a0a6d642275ba9f8248ffcab5576b
Author: Sean Quah <[email protected]>
AuthorDate: Fri Dec 13 12:31:53 2024 +0000

    KAFKA-18199; Fix size calculation for nullable tagged structs (#18127)
    
    When a struct field is tagged and nullable, it is serialized as
    { varint tag; varint dataLength; nullable data }, where
    nullable is serialized as
    { varint isNotNull; if (isNotNull) struct s; }. The length field
    includes the is-not-null varint.
    
    This patch fixes a bug in serialization where the written value of
    the length field and the value used to compute the size of the length
    field differs by 1. In practice this has no impact unless the
    serialized length of the struct is 127 bytes, since the varint encodings
    of 127 and 128 have different lengths (0x7f vs 0x80 01).
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/message/NullableStructMessageTest.java  | 26 ++++++++++++++++++++++
 .../common/message/SimpleExampleMessageTest.java   |  2 ++
 .../apache/kafka/message/MessageDataGenerator.java | 19 ++++++++++------
 3 files changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/NullableStructMessageTest.java
 
b/clients/src/test/java/org/apache/kafka/common/message/NullableStructMessageTest.java
index a171520b2c3..1dbc579db03 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/message/NullableStructMessageTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/message/NullableStructMessageTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 
 import org.junit.jupiter.api.Test;
 
@@ -98,6 +99,29 @@ public class NullableStructMessageTest {
         message.toString();
     }
 
+    /**
+     * Regression test for KAFKA-18199. Tests that the size of the varint 
encoding a tagged nullable
+     * struct's size is calculated correctly.
+     */
+    @Test
+    public void testTaggedStructSize() {
+        NullableStructMessageData message = new NullableStructMessageData()
+            .setNullableStruct(null)
+            .setNullableStruct2(null)
+            .setNullableStruct3(null)
+            .setNullableStruct4(new NullableStructMessageData.MyStruct4()
+                .setMyInt(4)
+                .setMyString(new String(new char[121])));
+
+        // We want the struct to be 127 bytes long, so that the varint 
encoding of its size is one
+        // short of overflowing into a two-byte representation. An extra byte 
is added to the
+        // nullable struct size to account for the is-not-null flag.
+        assertEquals(127, message.nullableStruct4().size(new 
ObjectSerializationCache(), (short) 2));
+
+        NullableStructMessageData newMessage = roundTrip(message, (short) 2);
+        assertEquals(message, newMessage);
+    }
+
     private NullableStructMessageData deserialize(ByteBuffer buf, short 
version) {
         NullableStructMessageData message = new NullableStructMessageData();
         message.read(new ByteBufferAccessor(buf.duplicate()), version);
@@ -110,6 +134,8 @@ public class NullableStructMessageTest {
 
     private NullableStructMessageData roundTrip(NullableStructMessageData 
message, short version) {
         ByteBuffer buffer = serialize(message, version);
+        // Check size calculation
+        assertEquals(buffer.remaining(), message.size(new 
ObjectSerializationCache(), version));
         return deserialize(buffer.duplicate(), version);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
 
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 9fc49f95060..341e327cda9 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -360,6 +360,8 @@ public class SimpleExampleMessageTest {
         short version
     ) {
         ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
+        // Check size calculation
+        assertEquals(buf.remaining(), message.size(new 
ObjectSerializationCache(), version));
         return deserialize(buf.duplicate(), version);
     }
 
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 9267b483586..b1315894aa5 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -1332,19 +1332,24 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
                         }).
                         generate(buffer);
                 } else if (field.type().isStruct()) {
-                    // Adding a byte if the field is nullable. A byte works 
for both regular and tagged struct fields.
-                    VersionConditional.forVersions(field.nullableVersions(), 
possibleVersions).
-                        ifMember(__ -> {
-                            buffer.printf("_size.addBytes(1);%n");
-                        }).
-                        generate(buffer);
-
                     if (tagged) {
                         buffer.printf("int _sizeBeforeStruct = 
_size.totalSize();%n", field.camelCaseName());
+                        // Add a byte if the field is nullable.
+                        
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
+                            ifMember(__ -> {
+                                buffer.printf("_size.addBytes(1);%n");
+                            }).
+                            generate(buffer);
                         buffer.printf("this.%s.addSize(_size, _cache, 
_version);%n", field.camelCaseName());
                         buffer.printf("int _structSize = _size.totalSize() - 
_sizeBeforeStruct;%n", field.camelCaseName());
                         
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_structSize));%n");
                     } else {
+                        // Add a byte if the field is nullable.
+                        
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
+                            ifMember(__ -> {
+                                buffer.printf("_size.addBytes(1);%n");
+                            }).
+                            generate(buffer);
                         buffer.printf("this.%s.addSize(_size, _cache, 
_version);%n", field.camelCaseName());
                     }
                 } else {

Reply via email to