This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c22d320a5c KAFKA-12902: Add unit32 type in generator (#10830)
c22d320a5c is described below

commit c22d320a5c85f38506839352d82d5fbbc0749878
Author: dengziming <[email protected]>
AuthorDate: Thu May 26 07:25:16 2022 +0800

    KAFKA-12902: Add unit32 type in generator (#10830)
    
    Add uint32 support in the KRPC generator.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../apache/kafka/common/protocol/MessageUtil.java  | 16 ++++++++++++++-
 .../org/apache/kafka/common/protocol/Readable.java |  4 ++++
 .../org/apache/kafka/common/protocol/Writable.java |  4 ++++
 .../apache/kafka/common/protocol/types/Field.java  |  6 ++++++
 .../apache/kafka/common/protocol/types/Struct.java | 21 ++++++++++++++++++-
 .../common/message/SimpleExampleMessageTest.java   | 24 ++++++++++++++++++++--
 .../kafka/common/protocol/MessageUtilTest.java     |  6 ++++++
 .../common/message/SimpleExampleMessage.json       |  7 ++++---
 .../java/org/apache/kafka/message/FieldSpec.java   | 21 ++++++++++++++++++-
 .../java/org/apache/kafka/message/FieldType.java   | 22 ++++++++++++++++++++
 .../kafka/message/JsonConverterGenerator.java      |  8 +++++++-
 .../apache/kafka/message/MessageDataGenerator.java | 19 +++++++++++++++--
 .../org/apache/kafka/message/MessageGenerator.java |  4 ++++
 .../org/apache/kafka/message/SchemaGenerator.java  |  6 ++++++
 .../apache/kafka/message/MessageGeneratorTest.java |  6 ++++++
 16 files changed, 164 insertions(+), 12 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c5cd99fdaa..10af4e66c2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -25,7 +25,7 @@
 
     <!-- Generator -->
     <suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
-              files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
+              
files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldType).java"/>
     <suppress checks="NPathComplexity"
               files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
     <suppress checks="JavaNCSS"
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
index 288ffd08f7..b366ebd8ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
@@ -29,6 +29,11 @@ import java.util.List;
 
 
 public final class MessageUtil {
+
+    public static final long UNSIGNED_INT_MAX = 4294967295L;
+
+    public static final int UNSIGNED_SHORT_MAX = 65535;
+
     /**
      * Copy a byte buffer into an array.  This will not affect the buffer's
      * position or mark.
@@ -87,13 +92,22 @@ public final class MessageUtil {
 
     public static int jsonNodeToUnsignedShort(JsonNode node, String about) {
         int value = jsonNodeToInt(node, about);
-        if (value < 0 || value > 65535) {
+        if (value < 0 || value > UNSIGNED_SHORT_MAX) {
             throw new RuntimeException(about + ": value " + value +
                 " does not fit in a 16-bit unsigned integer.");
         }
         return value;
     }
 
+    public static long jsonNodeToUnsignedInt(JsonNode node, String about) {
+        long value = jsonNodeToLong(node, about);
+        if (value < 0 || value > UNSIGNED_INT_MAX) {
+            throw new RuntimeException(about + ": value " + value +
+                    " does not fit in a 32-bit unsigned integer.");
+        }
+        return value;
+    }
+
     public static int jsonNodeToInt(JsonNode node, String about) {
         if (node.isInt()) {
             return node.asInt();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 9c9e461ca8..561696827d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -75,4 +75,8 @@ public interface Readable {
     default int readUnsignedShort() {
         return Short.toUnsignedInt(readShort());
     }
+
+    default long readUnsignedInt() {
+        return Integer.toUnsignedLong(readInt());
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
index 8dbec87134..0677340af4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
@@ -54,4 +54,8 @@ public interface Writable {
         // ints outside the valid range of a short.
         writeShort((short) i);
     }
+
+    default void writeUnsignedInt(long i) {
+        writeInt((int) i);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 44726f8240..f030387b6f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -97,6 +97,12 @@ public class Field {
         }
     }
 
+    public static class Uint32 extends Field {
+        public Uint32(String name, String docString) {
+            super(name, Type.UNSIGNED_INT32, docString, false, null);
+        }
+    }
+
     public static class Float64 extends Field {
         public Float64(String name, String docString) {
             super(name, Type.FLOAT64, docString, false, null);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 9b9b5e66b6..e39a84137b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -23,6 +23,9 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_INT_MAX;
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_SHORT_MAX;
+
 /**
  * A record that can be serialized and deserialized according to a pre-defined 
schema
  */
@@ -97,6 +100,10 @@ public class Struct {
         return getInt(field.name);
     }
 
+    public Long get(Field.Uint32 field) {
+        return getLong(field.name);
+    }
+
     public Short get(Field.Int16 field) {
         return getShort(field.name);
     }
@@ -270,6 +277,10 @@ public class Struct {
         return (Long) get(name);
     }
 
+    public Long getUnsignedInt(BoundField field) {
+        return (Long) get(field);
+    }
+
     public Long getLong(BoundField field) {
         return (Long) get(field);
     }
@@ -400,13 +411,21 @@ public class Struct {
     }
 
     public Struct set(Field.Uint16 def, int value) {
-        if (value < 0 || value > 65535) {
+        if (value < 0 || value > UNSIGNED_SHORT_MAX) {
             throw new RuntimeException("Invalid value for unsigned short for " 
+
                     def.name + ": " + value);
         }
         return set(def.name, value);
     }
 
+    public Struct set(Field.Uint32 def, long value) {
+        if (value < 0 || value > UNSIGNED_INT_MAX) {
+            throw new RuntimeException("Invalid value for unsigned int for " +
+                    def.name + ": " + value);
+        }
+        return set(def.name, value);
+    }
+
     public Struct set(Field.Float64 def, double value) {
         return set(def.name, value);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
 
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 1cdafcd0fd..b904eed272 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -30,6 +30,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.function.Consumer;
 
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_INT_MAX;
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_SHORT_MAX;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -179,6 +181,18 @@ public class SimpleExampleMessageTest {
             message -> assertEquals((short) 456, message.myInt16()));
     }
 
+    @Test
+    public void testMyUint32() {
+        // Verify that the uint16 field reads as 33000 when not set.
+        testRoundTrip(new SimpleExampleMessageData(),
+                message -> assertEquals(1234567, message.myUint32()));
+
+        testRoundTrip(new SimpleExampleMessageData().setMyUint32(123),
+                message -> assertEquals(123, message.myUint32()));
+        testRoundTrip(new SimpleExampleMessageData().setMyUint32(60000),
+                message -> assertEquals(60000, message.myUint32()));
+    }
+
     @Test
     public void testMyUint16() {
         // Verify that the uint16 field reads as 33000 when not set.
@@ -206,7 +220,12 @@ public class SimpleExampleMessageTest {
         assertThrows(RuntimeException.class,
             () -> new SimpleExampleMessageData().setMyUint16(-1));
         assertThrows(RuntimeException.class,
-            () -> new SimpleExampleMessageData().setMyUint16(65536));
+            () -> new 
SimpleExampleMessageData().setMyUint16(UNSIGNED_SHORT_MAX + 1));
+
+        assertThrows(RuntimeException.class,
+                () -> new SimpleExampleMessageData().setMyUint32(-1));
+        assertThrows(RuntimeException.class,
+                () -> new 
SimpleExampleMessageData().setMyUint32(UNSIGNED_INT_MAX + 1));
 
         // Verify that the tagged field reads as empty when not set.
         testRoundTrip(new SimpleExampleMessageData(),
@@ -355,6 +374,7 @@ public class SimpleExampleMessageTest {
                 "myTaggedStruct=TaggedStruct(structId=''), " +
                 "myCommonStruct=TestCommonStruct(foo=123, bar=123), " +
                 "myOtherCommonStruct=TestCommonStruct(foo=123, bar=123), " +
-                "myUint16=65535)", message.toString());
+                "myUint16=65535, " +
+                "myUint32=1234567)", message.toString());
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java 
b/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java
index 33dcabb80a..5195f55115 100755
--- 
a/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java
@@ -77,4 +77,10 @@ public final class MessageUtilTest {
             Arrays.asList(new RawTaggedField(1, new byte[] {1}),
                 new RawTaggedField(2, new byte[] {}))));
     }
+
+    @Test
+    public void testConstants() {
+        assertEquals(MessageUtil.UNSIGNED_SHORT_MAX, 0xFFFF);
+        assertEquals(MessageUtil.UNSIGNED_INT_MAX, 0xFFFFFFFFL);
+    }
 }
diff --git 
a/clients/src/test/resources/common/message/SimpleExampleMessage.json 
b/clients/src/test/resources/common/message/SimpleExampleMessage.json
index 342a9b994a..9b9c049593 100644
--- a/clients/src/test/resources/common/message/SimpleExampleMessage.json
+++ b/clients/src/test/resources/common/message/SimpleExampleMessage.json
@@ -50,9 +50,10 @@
       "fields": [
         { "name": "structId", "type": "string", "versions": "2+", "about": 
"String field in struct"}
     ]},
-    { "name":  "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
-    { "name":  "myOtherCommonStruct", "type": "TestCommonStruct", "versions": 
"0+"},
-    { "name":  "myUint16", "type": "uint16", "versions": "1+", "default":  
"33000" }
+    { "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
+    { "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": 
"0+"},
+    { "name": "myUint16", "type": "uint16", "versions": "1+", "default": 
"33000" },
+    { "name": "myUint32", "type": "uint32", "versions": "1+", "default": 
"1234567" }
   ],
   "commonStructs": [
     { "name": "TestCommonStruct", "versions": "0+", "fields": [
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java 
b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
index d15b03cdb9..1853458ee9 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
@@ -300,6 +300,7 @@ public final class FieldSpec {
         } else if ((type instanceof FieldType.Int8FieldType) ||
             (type instanceof FieldType.Int16FieldType) ||
             (type instanceof FieldType.Uint16FieldType) ||
+            (type instanceof FieldType.Uint32FieldType) ||
             (type instanceof FieldType.Int32FieldType) ||
             (type instanceof FieldType.Int64FieldType)) {
             int base = 10;
@@ -338,7 +339,7 @@ public final class FieldSpec {
                 } else {
                     try {
                         int value = Integer.valueOf(defaultString, base);
-                        if (value < 0 || value > 65535) {
+                        if (value < 0 || value > 
MessageGenerator.UNSIGNED_SHORT_MAX) {
                             throw new RuntimeException("Invalid default for 
uint16 field " +
                                     name + ": out of range.");
                         }
@@ -348,6 +349,22 @@ public final class FieldSpec {
                     }
                     return fieldDefault;
                 }
+            } else if (type instanceof FieldType.Uint32FieldType) {
+                if (defaultString.isEmpty()) {
+                    return "0";
+                } else {
+                    try {
+                        long value = Long.valueOf(defaultString, base);
+                        if (value < 0 || value > 
MessageGenerator.UNSIGNED_INT_MAX) {
+                            throw new RuntimeException("Invalid default for 
uint32 field " +
+                                    name + ": out of range.");
+                        }
+                    } catch (NumberFormatException e) {
+                        throw new RuntimeException("Invalid default for uint32 
field " +
+                                name + ": " + defaultString, e);
+                    }
+                    return fieldDefault;
+                }
             } else if (type instanceof FieldType.Int32FieldType) {
                 if (defaultString.isEmpty()) {
                     return "0";
@@ -476,6 +493,8 @@ public final class FieldSpec {
             return "short";
         } else if (type instanceof FieldType.Uint16FieldType) {
             return "int";
+        } else if (type instanceof FieldType.Uint32FieldType) {
+            return "long";
         } else if (type instanceof FieldType.Int32FieldType) {
             return "int";
         } else if (type instanceof FieldType.Int64FieldType) {
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java 
b/generator/src/main/java/org/apache/kafka/message/FieldType.java
index e0009c22ea..24b79d47cb 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldType.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java
@@ -122,6 +122,26 @@ public interface FieldType {
         }
     }
 
+    final class Uint32FieldType implements FieldType {
+        static final Uint32FieldType INSTANCE = new Uint32FieldType();
+        private static final String NAME = "uint32";
+
+        @Override
+        public String getBoxedJavaType(HeaderGenerator headerGenerator) {
+            return "Long";
+        }
+
+        @Override
+        public Optional<Integer> fixedLength() {
+            return Optional.of(4);
+        }
+
+        @Override
+        public String toString() {
+            return NAME;
+        }
+    }
+
     final class Int64FieldType implements FieldType {
         static final Int64FieldType INSTANCE = new Int64FieldType();
         private static final String NAME = "int64";
@@ -369,6 +389,8 @@ public interface FieldType {
                 return Int16FieldType.INSTANCE;
             case Uint16FieldType.NAME:
                 return Uint16FieldType.INSTANCE;
+            case Uint32FieldType.NAME:
+                return Uint32FieldType.INSTANCE;
             case Int32FieldType.NAME:
                 return Int32FieldType.INSTANCE;
             case Int64FieldType.NAME:
diff --git 
a/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
index 2df8170b05..8ce07b9275 100644
--- 
a/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
+++ 
b/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
@@ -164,6 +164,11 @@ public final class JsonConverterGenerator implements 
MessageClassGenerator {
             buffer.printf("%s;%n", target.assignmentStatement(
                 String.format("MessageUtil.jsonNodeToUnsignedShort(%s, 
\"%s\")",
                     target.sourceVariable(), target.humanReadableName())));
+        } else if (target.field().type() instanceof FieldType.Uint32FieldType) 
{
+            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
+            buffer.printf("%s;%n", target.assignmentStatement(
+                 String.format("MessageUtil.jsonNodeToUnsignedInt(%s, \"%s\")",
+                     target.sourceVariable(), target.humanReadableName())));
         } else if (target.field().type() instanceof FieldType.Int32FieldType) {
             headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
             buffer.printf("%s;%n", target.assignmentStatement(
@@ -346,7 +351,8 @@ public final class JsonConverterGenerator implements 
MessageClassGenerator {
             headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
             buffer.printf("%s;%n", target.assignmentStatement(
                 String.format("new IntNode(%s)", target.sourceVariable())));
-        } else if (target.field().type() instanceof FieldType.Int64FieldType) {
+        } else if (target.field().type() instanceof FieldType.Int64FieldType ||
+                (target.field().type() instanceof FieldType.Uint32FieldType)) {
             headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
             buffer.printf("%s;%n", target.assignmentStatement(
                 String.format("new LongNode(%s)", target.sourceVariable())));
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index b9923ee572..235667480c 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -541,6 +541,8 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
             return "_readable.readShort()";
         } else if (type instanceof FieldType.Uint16FieldType) {
             return "_readable.readUnsignedShort()";
+        } else if (type instanceof FieldType.Uint32FieldType) {
+            return "_readable.readUnsignedInt()";
         } else if (type instanceof FieldType.Int32FieldType) {
             return "_readable.readInt()";
         } else if (type instanceof FieldType.Int64FieldType) {
@@ -848,6 +850,8 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
             return String.format("_writable.writeShort(%s)", name);
         } else if (type instanceof FieldType.Uint16FieldType) {
             return String.format("_writable.writeUnsignedShort(%s)", name);
+        } else if (type instanceof FieldType.Uint32FieldType) {
+            return String.format("_writable.writeUnsignedInt(%s)", name);
         } else if (type instanceof FieldType.Int32FieldType) {
             return String.format("_writable.writeInt(%s)", name);
         } else if (type instanceof FieldType.Int64FieldType) {
@@ -1372,7 +1376,8 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
                     (field.type() instanceof FieldType.Int32FieldType)) {
             buffer.printf("hashCode = 31 * hashCode + %s;%n",
                 field.camelCaseName());
-        } else if (field.type() instanceof FieldType.Int64FieldType) {
+        } else if (field.type() instanceof FieldType.Int64FieldType ||
+                    (field.type() instanceof FieldType.Uint32FieldType)) {
             buffer.printf("hashCode = 31 * hashCode + ((int) (%s >> 32) ^ 
(int) %s);%n",
                 field.camelCaseName(), field.camelCaseName());
         } else if (field.type() instanceof FieldType.UUIDFieldType) {
@@ -1427,6 +1432,7 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
                 (field.type() instanceof FieldType.Int8FieldType) ||
                 (field.type() instanceof FieldType.Int16FieldType) ||
                 (field.type() instanceof FieldType.Uint16FieldType) ||
+                (field.type() instanceof FieldType.Uint32FieldType) ||
                 (field.type() instanceof FieldType.Int32FieldType) ||
                 (field.type() instanceof FieldType.Int64FieldType) ||
                 (field.type() instanceof FieldType.Float64FieldType) ||
@@ -1514,6 +1520,7 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
         } else if ((field.type() instanceof FieldType.Int8FieldType) ||
                 (field.type() instanceof FieldType.Int16FieldType) ||
                 (field.type() instanceof FieldType.Uint16FieldType) ||
+                (field.type() instanceof FieldType.Uint32FieldType) ||
                 (field.type() instanceof FieldType.Int32FieldType) ||
                 (field.type() instanceof FieldType.Int64FieldType) ||
                 (field.type() instanceof FieldType.Float64FieldType)) {
@@ -1576,13 +1583,21 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
             field.fieldAbstractJavaType(headerGenerator, structRegistry));
         buffer.incrementIndent();
         if (field.type() instanceof FieldType.Uint16FieldType) {
-            buffer.printf("if (v < 0 || v > 65535) {%n");
+            buffer.printf("if (v < 0 || v > %d) {%n", 
MessageGenerator.UNSIGNED_SHORT_MAX);
             buffer.incrementIndent();
             buffer.printf("throw new RuntimeException(\"Invalid value \" + v + 
" +
                     "\" for unsigned short field.\");%n");
             buffer.decrementIndent();
             buffer.printf("}%n");
         }
+        if (field.type() instanceof FieldType.Uint32FieldType) {
+            buffer.printf("if (v < 0 || v > %dL) {%n", 
MessageGenerator.UNSIGNED_INT_MAX);
+            buffer.incrementIndent();
+            buffer.printf("throw new RuntimeException(\"Invalid value \" + v + 
" +
+                    "\" for unsigned int field.\");%n");
+            buffer.decrementIndent();
+            buffer.printf("}%n");
+        }
         buffer.printf("this.%s = v;%n", field.camelCaseName());
         buffer.printf("return this;%n");
         buffer.decrementIndent();
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index cfbeae84ec..56f3f6ab0b 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -156,6 +156,10 @@ public final class MessageGenerator {
 
     static final String DOUBLE_NODE_CLASS = 
"com.fasterxml.jackson.databind.node.DoubleNode";
 
+    static final long UNSIGNED_INT_MAX = 4294967295L;
+
+    static final int UNSIGNED_SHORT_MAX = 65535;
+
     /**
      * The Jackson serializer we use for JSON objects.
      */
diff --git 
a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
index 5ebd158839..a5ae8300d5 100644
--- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
@@ -250,6 +250,12 @@ final class SchemaGenerator {
                 throw new RuntimeException("Type " + type + " cannot be 
nullable.");
             }
             return "Type.UINT16";
+        } else if (type instanceof FieldType.Uint32FieldType) {
+            headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
+            if (nullable) {
+                throw new RuntimeException("Type " + type + " cannot be 
nullable.");
+            }
+            return "Type.UNSIGNED_INT32";
         } else if (type instanceof FieldType.Int32FieldType) {
             headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
             if (nullable) {
diff --git 
a/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java 
b/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java
index 07766f23f5..8eb38e999c 100644
--- a/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java
+++ b/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java
@@ -67,4 +67,10 @@ public class MessageGeneratorTest {
         } catch (RuntimeException e) {
         }
     }
+
+    @Test
+    public void testConstants() {
+        assertEquals(MessageGenerator.UNSIGNED_SHORT_MAX, 0xFFFF);
+        assertEquals(MessageGenerator.UNSIGNED_INT_MAX, 0xFFFFFFFFL);
+    }
 }

Reply via email to