This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58d62d15227 KAFKA-19634 Formalize nullable and non-nullable type 
distinctions in protocol specification (#20614)
58d62d15227 is described below

commit 58d62d152275e29cb74a84e925624e1e9a681c8d
Author: Lan Ding <[email protected]>
AuthorDate: Fri Nov 28 21:50:26 2025 +0800

    KAFKA-19634 Formalize nullable and non-nullable type distinctions in 
protocol specification (#20614)
    
    This patch introduces a clear separation between nullable and
    non-nullable data structures. The key changes include:
    
    1. Differentiates between nullable and non-nullable versions of
    `RECORDS`, `COMPACT_RECORDS`, and `Schema` types.
    2. Adds explicit nullable type names for `ArrayOf` and `CompactArrayOf`.
    3. Introduces a new, concise syntax for representing types:
       - `{}` for struct, `?{}` for nullable struct
       - `[T]` for array, `?[T]` for nullable array
       - `(T)` for compact array, `?(T)` for nullable compact array
    4. Declares shared schemas as non-nullable `Schema` by default. A field
    that references a shared schema and is nullable must be explicitly
    declared as a new `NullableSchema(X)`.
    5. Add UTs to verify the consistency between schema and message
    serialization.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  13 +-
 .../org/apache/kafka/common/protocol/Protocol.java |  16 +-
 .../kafka/common/protocol/types/ArrayOf.java       |  33 +++-
 .../kafka/common/protocol/types/BoundField.java    |   2 +-
 .../common/protocol/types/CompactArrayOf.java      |  32 +++-
 .../common/protocol/types/NullableSchema.java      | 103 ++++++++++++
 .../apache/kafka/common/protocol/types/Schema.java |  34 +++-
 .../apache/kafka/common/protocol/types/Type.java   | 160 +++++++++++++++---
 .../src/main/resources/common/message/README.md    |   6 +-
 .../message/ProtocolRoundTripConsistencyTest.java  | 180 +++++++++++++++++++++
 .../protocol/types/ProtocolSerializationTest.java  |   4 +-
 .../kafka/common/protocol/types/TypeTest.java      |  24 +--
 .../resources/common/message/AllTypeMessage.json   |  52 ++++++
 .../org/apache/kafka/message/MessageGenerator.java |   2 +
 .../org/apache/kafka/message/SchemaGenerator.java  |  16 +-
 15 files changed, 609 insertions(+), 68 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 89b952e6ce7..79b283b4f8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -35,7 +35,10 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.COMPACT_BYTES;
 import static 
org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_BYTES;
+import static 
org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_RECORDS;
+import static org.apache.kafka.common.protocol.types.Type.COMPACT_RECORDS;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_RECORDS;
 import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 
 /**
@@ -135,7 +138,6 @@ public enum ApiKeys {
     DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
     ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
     DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
-    
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> 
APIS_BY_LISTENER =
         new EnumMap<>(ApiMessageType.ListenerType.class);
@@ -342,9 +344,14 @@ public enum ApiKeys {
         Schema.Visitor detector = new Schema.Visitor() {
             @Override
             public void visit(Type field) {
-                if (field == BYTES || field == NULLABLE_BYTES || field == 
RECORDS ||
-                    field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES)
+                // avoid BooleanExpressionComplexity checkstyle warning
+                boolean isBytesType = field == BYTES || field == 
NULLABLE_BYTES ||
+                    field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES;
+                boolean isRecordsType = field == RECORDS || field == 
NULLABLE_RECORDS ||
+                    field == COMPACT_RECORDS || field == 
COMPACT_NULLABLE_RECORDS;
+                if (isBytesType || isRecordsType) {
                     hasBuffer.set(true);
+                }
             }
         };
         schema.walk(detector);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 1b051d58bf0..e9d7609403c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -40,13 +40,16 @@ public class Protocol {
         final String indentStr = indentString(indentSize);
         final Map<String, Type> subTypes = new LinkedHashMap<>();
 
+        b.append(schema.leftBracket());
+        b.append(" ");
         // Top level fields
         for (BoundField field: schema.fields()) {
             Type type = field.def.type;
             if (type.isArray()) {
-                b.append("[");
+                b.append(type.leftBracket());
                 b.append(field.def.name);
-                b.append("] ");
+                b.append(type.rightBracket());
+                b.append(" ");
                 if (!subTypes.containsKey(field.def.name)) {
                     subTypes.put(field.def.name, 
type.arrayElementType().get());
                 }
@@ -54,9 +57,9 @@ public class Protocol {
                 Map<Integer, Field> taggedFields = new 
TreeMap<>(((TaggedFields) type).fields());
                 taggedFields.forEach((tag, taggedField) -> {
                     if (taggedField.type.isArray()) {
-                        b.append("[");
+                        b.append(type.leftBracket());
                         b.append(taggedField.name);
-                        b.append("]");
+                        b.append(type.rightBracket());
                         if (!subTypes.containsKey(taggedField.name))
                             subTypes.put(taggedField.name + "&lt;tag: " + 
tag.toString() + "&gt;", taggedField.type.arrayElementType().get());
                     } else {
@@ -75,6 +78,7 @@ public class Protocol {
                     subTypes.put(field.def.name, type);
             }
         }
+        b.append(schema.rightBracket());
         b.append("\n");
 
         // Sub Types/Schemas
@@ -227,14 +231,14 @@ public class Protocol {
                 b.append(" Response (Version: ");
                 b.append(version);
                 b.append(") => ");
-                schemaToBnfHtml(responses[version], b, 2);
+                schemaToBnfHtml(schema, b, 2);
                 b.append("</pre>");
 
                 b.append("<p><b>Response header version:</b> ");
                 b.append(key.responseHeaderVersion((short) version));
                 b.append("</p>\n");
 
-                schemaToFieldTableHtml(responses[version], b);
+                schemaToFieldTableHtml(schema, b);
                 b.append("</div>\n");
             }
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index 3333084ef66..17c827744b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -28,6 +28,8 @@ public class ArrayOf extends DocumentedType {
 
     private static final String ARRAY_TYPE_NAME = "ARRAY";
 
+    private static final String NULLABLE_ARRAY_TYPE_NAME = "NULLABLE_ARRAY";
+
     private final Type type;
     private final boolean nullable;
 
@@ -97,9 +99,20 @@ public class ArrayOf extends DocumentedType {
         return Optional.of(type);
     }
 
+    @Override
+    public String leftBracket() {
+        return nullable ? "?[" : "[";
+    }
+
+    @Override
+    public String rightBracket() {
+        return "]";
+    }
+
     @Override
     public String toString() {
-        return ARRAY_TYPE_NAME + "(" + type + ")";
+        String name = nullable ? NULLABLE_ARRAY_TYPE_NAME : ARRAY_TYPE_NAME;
+        return name + "(" + type + ")";
     }
 
     @Override
@@ -119,15 +132,27 @@ public class ArrayOf extends DocumentedType {
 
     @Override
     public String typeName() {
-        return ARRAY_TYPE_NAME;
+        return nullable ? NULLABLE_ARRAY_TYPE_NAME : ARRAY_TYPE_NAME;
     }
 
     @Override
     public String documentation() {
-        return "Represents a sequence of objects of a given type T. " +
+        String doc;
+        if (nullable) {
+            doc = "Represents a sequence of objects of a given type T. " +
                 "Type T can be either a primitive type (e.g. " + STRING + ") 
or a structure. " +
                 "First, the length N is given as an " + INT32 + ". Then N 
instances of type T follow. " +
                 "A null array is represented with a length of -1. " +
-                "In protocol documentation an array of T instances is referred 
to as [T].";
+                "In protocol documentation a nullable array of T instances is 
referred to as " +
+                leftBracket() + "T" + rightBracket() + ".";
+        } else {
+            doc = "Represents a sequence of objects of a given type T. " +
+                "Type T can be either a primitive type (e.g. " + STRING + ") 
or a structure. " +
+                "First, the length N is given as an " + INT32 + ". Then N 
instances of type T follow. " +
+                "In protocol documentation an array of T instances is referred 
to as " +
+                leftBracket() + "T" + rightBracket() + ".";
+        }
+
+        return doc;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
index b031b4fce82..e40931bc7ed 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
@@ -29,7 +29,7 @@ public class BoundField {
         this.schema = schema;
         this.index = index;
     }
-
+    
     @Override
     public String toString() {
         return def.name + ":" + def.type;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
index 6a252a8a4cc..e41a4c0dc7f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
@@ -30,6 +30,8 @@ import java.util.Optional;
 public class CompactArrayOf extends DocumentedType {
     private static final String COMPACT_ARRAY_TYPE_NAME = "COMPACT_ARRAY";
 
+    private static final String COMPACT_NULLABLE_ARRAY_TYPE_NAME = 
"COMPACT_NULLABLE_ARRAY";
+
     private final Type type;
     private final boolean nullable;
 
@@ -103,9 +105,20 @@ public class CompactArrayOf extends DocumentedType {
         return Optional.of(type);
     }
 
+    @Override
+    public String leftBracket() {
+        return nullable ? "?(" : "(";
+    }
+
+    @Override
+    public String rightBracket() {
+        return ")";
+    }
+
     @Override
     public String toString() {
-        return COMPACT_ARRAY_TYPE_NAME + "(" + type + ")";
+        String name = nullable ? COMPACT_NULLABLE_ARRAY_TYPE_NAME : 
COMPACT_ARRAY_TYPE_NAME;
+        return name + "(" + type + ")";
     }
 
     @Override
@@ -125,15 +138,26 @@ public class CompactArrayOf extends DocumentedType {
 
     @Override
     public String typeName() {
-        return COMPACT_ARRAY_TYPE_NAME;
+        return nullable ? COMPACT_NULLABLE_ARRAY_TYPE_NAME : 
COMPACT_ARRAY_TYPE_NAME;
     }
 
     @Override
     public String documentation() {
-        return "Represents a sequence of objects of a given type T. " +
+        String doc;
+        if (nullable) {
+            doc = "Represents a sequence of objects of a given type T. " +
                 "Type T can be either a primitive type (e.g. " + STRING + ") 
or a structure. " +
                 "First, the length N + 1 is given as an UNSIGNED_VARINT. Then 
N instances of type T follow. " +
                 "A null array is represented with a length of 0. " +
-                "In protocol documentation an array of T instances is referred 
to as [T].";
+                "In protocol documentation a compact nullable array of T 
instances is referred to as " +
+                leftBracket() + "T" + rightBracket() + ".";
+        } else {
+            doc = "Represents a sequence of objects of a given type T. " +
+                "Type T can be either a primitive type (e.g. " + STRING + ") 
or a structure. " +
+                "First, the length N + 1 is given as an UNSIGNED_VARINT. Then 
N instances of type T follow. " +
+                "In protocol documentation a compact array of T instances is 
referred to as " +
+                leftBracket() + "T" + rightBracket() + ".";
+        }
+        return doc;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java
 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java
new file mode 100644
index 00000000000..f5011db5421
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * The nullable schema for a compound record definition
+ */
+public final class NullableSchema extends Schema {
+
+    private static final String NULLABLE_STRUCT_TYPE_NAME = "NULLABLE_STRUCT";
+
+    public NullableSchema(Schema schema) {
+        super(schema.tolerateMissingFieldsWithDefaults(), 
Arrays.stream(schema.fields()).map(field -> field.def).toArray(Field[]::new));
+    }
+
+    @Override
+    public boolean isNullable() {
+        return true;
+    }
+
+    /**
+     * Write a struct to the buffer with special handling for null values
+     * If the input object is null, writes a byte value of -1 to the buffer as 
a null indicator.
+     */
+    @Override
+    public void write(ByteBuffer buffer, Object o) {
+        if (o == null) {
+            buffer.put((byte) -1);
+            return;
+        }
+
+        buffer.put((byte) 1);
+        super.write(buffer, o);
+    }
+
+    @Override
+    public Struct read(ByteBuffer buffer) {
+        byte nullIndicator = buffer.get();
+        if (nullIndicator < 0)
+            return null;
+
+        return super.read(buffer);
+    }
+
+    @Override
+    public int sizeOf(Object o) {
+        if (o == null)
+            return 1;
+
+        return 1 + super.sizeOf(o);
+    }
+
+    @Override
+    public Struct validate(Object item) {
+        if (item == null)
+            return null;
+
+        return super.validate(item);
+    }
+
+    @Override
+    public String typeName() {
+        return NULLABLE_STRUCT_TYPE_NAME;
+    }
+
+    @Override
+    public String leftBracket() {
+        return "?{";
+    }
+
+    @Override
+    public String rightBracket() {
+        return "}";
+    }
+    
+    @Override
+    public String documentation() {
+        return "A nullable struct is named by a string with a capitalized 
first letter and consists of one or more fields. " +
+            "It represents a composite object or null. " +
+            "For non-null values, the first byte has value 1, " +
+            "followed by the serialization of each field in the order they are 
defined. " +
+            "A null value is encoded as a byte with value -1 and there are no 
following bytes." +
+            "In protocol documentation a nullable struct containing multiple 
fields is enclosed by " + 
+            leftBracket() + " and " + rightBracket() + ".";
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 325a77fe43f..9cfc51dd76f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.protocol.types;
 
+import org.apache.kafka.common.protocol.types.Type.DocumentedType;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -24,7 +26,9 @@ import java.util.Objects;
 /**
  * The schema for a compound record definition
  */
-public final class Schema extends Type {
+public class Schema extends DocumentedType {
+    private static final String STRUCT_TYPE_NAME = "STRUCT";
+
     private static final Object[] NO_VALUES = new Object[0];
 
     private final BoundField[] fields;
@@ -53,6 +57,7 @@ public final class Schema extends Type {
      *
      * @throws SchemaException If the given list have duplicate fields
      */
+    @SuppressWarnings("this-escape")
     public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
         this.fields = new BoundField[fs.length];
         this.fieldsByName = new HashMap<>();
@@ -173,6 +178,20 @@ public final class Schema extends Type {
         return this.fields;
     }
 
+    protected boolean tolerateMissingFieldsWithDefaults() {
+        return this.tolerateMissingFieldsWithDefaults;
+    }
+
+    @Override
+    public String leftBracket() {
+        return "{";
+    }
+
+    @Override
+    public String rightBracket() {
+        return "}";
+    }
+
     /**
      * Display a string representation of the schema
      */
@@ -206,6 +225,19 @@ public final class Schema extends Type {
         }
     }
 
+    @Override
+    public String typeName() {
+        return STRUCT_TYPE_NAME;
+    }
+
+    @Override
+    public String documentation() {
+        return "A struct is named by a string with a capitalized first letter 
and consists of one or more fields. " +
+            "It represents a composite object encoded as the serialization of 
each field in the order they are defined." + 
+            "In protocol documentation a struct containing multiple fields is 
enclosed by " + 
+            leftBracket() + " and " + rightBracket() + ".";
+    }
+
     public void walk(Visitor visitor) {
         Objects.requireNonNull(visitor, "visitor must be non-null");
         handleNode(this, visitor);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index f4c0ee5705c..91f578262f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -79,6 +79,20 @@ public abstract class Type {
         return arrayElementType().isPresent();
     }
 
+    /**
+     * For annotation in the generated html protocol doc.
+     */
+    public String leftBracket() {
+        return "";
+    }
+
+    /**
+     * For annotation in the generated html protocol doc.
+     */
+    public String rightBracket() {
+        return "";
+    }
+
     /**
      * A Type that can return its description for documentation purposes.
      */
@@ -774,7 +788,7 @@ public abstract class Type {
         @Override
         public String documentation() {
             return "Represents a raw sequence of bytes. First the length N+1 
is given as an UNSIGNED_VARINT." +
-                    "Then N bytes follow.";
+                    " Then N bytes follow.";
         }
     };
 
@@ -912,23 +926,17 @@ public abstract class Type {
         @Override
         public String documentation() {
             return "Represents a raw sequence of bytes. First the length N+1 
is given as an UNSIGNED_VARINT." +
-                    "Then N bytes follow. A null object is represented with a 
length of 0.";
+                    " Then N bytes follow. A null object is represented with a 
length of 0.";
         }
     };
 
-    public static final DocumentedType COMPACT_RECORDS = new DocumentedType() {
-        @Override
-        public boolean isNullable() {
-            return true;
-        }
+    public static final DocumentedType RECORDS = new DocumentedType() {
 
         @Override
         public void write(ByteBuffer buffer, Object o) {
-            if (o == null) {
-                COMPACT_NULLABLE_BYTES.write(buffer, null);
-            } else if (o instanceof MemoryRecords) {
+            if (o instanceof MemoryRecords) {
                 MemoryRecords records = (MemoryRecords) o;
-                COMPACT_NULLABLE_BYTES.write(buffer, 
records.buffer().duplicate());
+                BYTES.write(buffer, records.buffer().duplicate());
             } else {
                 throw new IllegalArgumentException("Unexpected record type: " 
+ o.getClass());
             }
@@ -936,20 +944,57 @@ public abstract class Type {
 
         @Override
         public MemoryRecords read(ByteBuffer buffer) {
-            ByteBuffer recordsBuffer = (ByteBuffer) 
COMPACT_NULLABLE_BYTES.read(buffer);
-            if (recordsBuffer == null) {
-                return null;
-            } else {
-                return MemoryRecords.readableRecords(recordsBuffer);
-            }
+            ByteBuffer recordsBuffer = (ByteBuffer) BYTES.read(buffer);
+            return MemoryRecords.readableRecords(recordsBuffer);
         }
 
         @Override
         public int sizeOf(Object o) {
-            if (o == null) {
-                return 1;
+            BaseRecords records = (BaseRecords) o;
+            return 4 + records.sizeInBytes();
+        }
+
+        @Override
+        public String typeName() {
+            return "RECORDS";
+        }
+
+        @Override
+        public BaseRecords validate(Object item) {
+            if (item instanceof MemoryRecords)
+                return (BaseRecords) item;
+
+            throw new SchemaException(item + " is not an instance of " + 
MemoryRecords.class.getName());
+        }
+
+        @Override
+        public String documentation() {
+            return "Represents a sequence of Kafka records as " + BYTES + ". " 
+
+                "For a detailed description of records see " +
+                "<a href=\"/documentation/#messageformat\">Message Sets</a>.";
+        }
+    };
+
+    public static final DocumentedType COMPACT_RECORDS = new DocumentedType() {
+
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            if (o instanceof MemoryRecords) {
+                MemoryRecords records = (MemoryRecords) o;
+                COMPACT_BYTES.write(buffer, records.buffer().duplicate());
+            } else {
+                throw new IllegalArgumentException("Unexpected record type: " 
+ o.getClass());
             }
+        }
+
+        @Override
+        public MemoryRecords read(ByteBuffer buffer) {
+            ByteBuffer recordsBuffer = (ByteBuffer) COMPACT_BYTES.read(buffer);
+            return MemoryRecords.readableRecords(recordsBuffer);
+        }
 
+        @Override
+        public int sizeOf(Object o) {
             BaseRecords records = (BaseRecords) o;
             int recordsSize = records.sizeInBytes();
             return ByteUtils.sizeOfUnsignedVarint(recordsSize + 1) + 
recordsSize;
@@ -962,9 +1007,6 @@ public abstract class Type {
 
         @Override
         public BaseRecords validate(Object item) {
-            if (item == null)
-                return null;
-
             if (item instanceof BaseRecords)
                 return (BaseRecords) item;
 
@@ -973,13 +1015,13 @@ public abstract class Type {
 
         @Override
         public String documentation() {
-            return "Represents a sequence of Kafka records as " + 
COMPACT_NULLABLE_BYTES + ". " +
+            return "Represents a sequence of Kafka records as " + 
COMPACT_BYTES + ". " +
                 "For a detailed description of records see " +
                 "<a href=\"/documentation/#messageformat\">Message Sets</a>.";
         }
     };
 
-    public static final DocumentedType RECORDS = new DocumentedType() {
+    public static final DocumentedType NULLABLE_RECORDS = new DocumentedType() 
{
         @Override
         public boolean isNullable() {
             return true;
@@ -1018,7 +1060,7 @@ public abstract class Type {
 
         @Override
         public String typeName() {
-            return "RECORDS";
+            return "NULLABLE_RECORDS";
         }
 
         @Override
@@ -1040,6 +1082,69 @@ public abstract class Type {
         }
     };
 
+    public static final DocumentedType COMPACT_NULLABLE_RECORDS = new 
DocumentedType() {
+        @Override
+        public boolean isNullable() {
+            return true;
+        }
+
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            if (o == null) {
+                COMPACT_NULLABLE_BYTES.write(buffer, null);
+            } else if (o instanceof MemoryRecords) {
+                MemoryRecords records = (MemoryRecords) o;
+                COMPACT_NULLABLE_BYTES.write(buffer, 
records.buffer().duplicate());
+            } else {
+                throw new IllegalArgumentException("Unexpected record type: " 
+ o.getClass());
+            }
+        }
+
+        @Override
+        public MemoryRecords read(ByteBuffer buffer) {
+            ByteBuffer recordsBuffer = (ByteBuffer) 
COMPACT_NULLABLE_BYTES.read(buffer);
+            if (recordsBuffer == null) {
+                return null;
+            } else {
+                return MemoryRecords.readableRecords(recordsBuffer);
+            }
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            if (o == null) {
+                return 1;
+            }
+
+            BaseRecords records = (BaseRecords) o;
+            int recordsSize = records.sizeInBytes();
+            return ByteUtils.sizeOfUnsignedVarint(recordsSize + 1) + 
recordsSize;
+        }
+
+        @Override
+        public String typeName() {
+            return "COMPACT_NULLABLE_RECORDS";
+        }
+
+        @Override
+        public BaseRecords validate(Object item) {
+            if (item == null)
+                return null;
+
+            if (item instanceof BaseRecords)
+                return (BaseRecords) item;
+
+            throw new SchemaException(item + " is not an instance of " + 
BaseRecords.class.getName());
+        }
+
+        @Override
+        public String documentation() {
+            return "Represents a sequence of Kafka records as " + 
COMPACT_NULLABLE_BYTES + ". " +
+                "For a detailed description of records see " +
+                "<a href=\"/documentation/#messageformat\">Message Sets</a>.";
+        }
+    };
+
     public static final DocumentedType VARINT = new DocumentedType() {
         @Override
         public void write(ByteBuffer buffer, Object o) {
@@ -1116,7 +1221,10 @@ public abstract class Type {
             UINT16, UNSIGNED_INT32, VARINT, VARLONG, UUID, FLOAT64,
             STRING, COMPACT_STRING, NULLABLE_STRING, COMPACT_NULLABLE_STRING,
             BYTES, COMPACT_BYTES, NULLABLE_BYTES, COMPACT_NULLABLE_BYTES,
-            RECORDS, COMPACT_RECORDS, new ArrayOf(STRING), new 
CompactArrayOf(COMPACT_STRING)};
+            RECORDS, COMPACT_RECORDS, NULLABLE_RECORDS, 
COMPACT_NULLABLE_RECORDS,
+            new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING), 
ArrayOf.nullable(STRING), CompactArrayOf.nullable(STRING),
+            new Schema(), new NullableSchema(new Schema())};
+
         final StringBuilder b = new StringBuilder();
         b.append("<table class=\"data-table\"><tbody>\n");
         b.append("<tr>");
diff --git a/clients/src/main/resources/common/message/README.md 
b/clients/src/main/resources/common/message/README.md
index 435635214f2..86c8f350ae3 100644
--- a/clients/src/main/resources/common/message/README.md
+++ b/clients/src/main/resources/common/message/README.md
@@ -91,6 +91,8 @@ There are several primitive field types available.
 
 * "records": recordset such as memory recordset.
 
+* "struct": a composite object consisting of one or more fields.
+
 In addition to these primitive field types, there is also an array type.  Array
 types start with a "[]" and end with the name of the element type.  For
 example, []Foo declares an array of "Foo" objects.  Array fields have their own
@@ -101,8 +103,8 @@ Guide](https://kafka.apache.org/protocol.html).
 
 Nullable Fields
 ---------------
-Booleans, ints, and floats can never be null.  However, fields that are 
strings,
-bytes, uuid, records, or arrays may optionally be "nullable".  When a field is 
+Booleans, ints, floats and uuid can never be null.  However, fields that are 
strings,
+bytes, records, struct, or arrays may optionally be "nullable".  When a field 
is 
 "nullable", that simply means that we are prepared to serialize and deserialize
 null entries for that field.
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/ProtocolRoundTripConsistencyTest.java
 
b/clients/src/test/java/org/apache/kafka/common/message/ProtocolRoundTripConsistencyTest.java
new file mode 100644
index 00000000000..d86cd2d543d
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/message/ProtocolRoundTripConsistencyTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.message;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ProtocolRoundTripConsistencyTest {
+
+    private Struct nonFlexibleStruct;
+
+    private Struct flexibleStruct;
+
+    private AllTypeMessageData messageData;
+
+    @BeforeEach
+    public void setup() {
+        nonFlexibleStruct = new Struct(AllTypeMessageData.SCHEMA_0)
+            .set("my_boolean", false)
+            .set("my_int8", (byte) 12)
+            .set("my_int16", (short) 123)
+            .set("my_uint16", 33000)
+            .set("my_int32", 1234)
+            .set("my_uint32", 1234567L)
+            .set("my_uint64", 0xcafcacafcacafcaL)
+            .set("my_uuid", Uuid.fromString("H3KKO4NTRPaCWtEmm3vW7A"))
+            .set("my_float64", 12.34D)
+            .set("my_string", "string")
+            .set("my_nullable_string", null)
+            .set("my_bytes", ByteBuffer.wrap("bytes".getBytes()))
+            .set("my_nullable_bytes", null)
+            .set("my_records", MemoryRecords.EMPTY)
+            .set("my_nullable_records", null)
+            .set("my_int_array", new Object[] {})
+            .set("my_nullable_int_array", null);
+        nonFlexibleStruct.set("my_common_struct", 
nonFlexibleStruct.instance("my_common_struct")
+            .set("foo", 123)
+            .set("bar", 123));
+
+        flexibleStruct = new Struct(AllTypeMessageData.SCHEMA_1)
+            .set("my_boolean", false)
+            .set("my_int8", (byte) 12)
+            .set("my_int16", (short) 123)
+            .set("my_uint16", 33000)
+            .set("my_int32", 1234)
+            .set("my_uint32", 1234567L)
+            .set("my_uint64", 0xcafcacafcacafcaL)
+            .set("my_uuid", Uuid.fromString("H3KKO4NTRPaCWtEmm3vW7A"))
+            .set("my_float64", 12.34D)
+            .set("my_compact_string", "compact string")
+            .set("my_compact_nullable_string", null)
+            .set("my_compact_bytes", ByteBuffer.wrap("compact 
bytes".getBytes()))
+            .set("my_compact_nullable_bytes", null)
+            .set("my_compact_records", MemoryRecords.EMPTY)
+            .set("my_compact_nullable_records", null)
+            .set("my_int_array", new Object[] {})
+            .set("my_nullable_int_array", null)
+            .set("_tagged_fields", new TreeMap<Integer, Field>());
+        flexibleStruct.set("my_common_struct", 
flexibleStruct.instance("my_common_struct")
+            .set("foo", 123)
+            .set("bar", 123)
+            .set("_tagged_fields", new TreeMap<Integer, Field>()));
+
+        messageData = new AllTypeMessageData();
+    }
+
+    @Test
+    public void testNonFlexibleWithNullDefault() {
+        messageData.setMyBytes("bytes".getBytes());
+        messageData.setMyRecords(MemoryRecords.EMPTY);
+
+        checkSchemaAndMessageRoundTripConsistency((short) 0, messageData, 
nonFlexibleStruct);
+    }
+
+    @Test
+    public void testNonFlexibleWithNonNullValue() {
+        messageData.setMyBytes("bytes".getBytes());
+        messageData.setMyRecords(MemoryRecords.EMPTY);
+        messageData.setMyNullableString("nullable string");
+        messageData.setMyNullableBytes("nullable bytes".getBytes());
+        messageData.setMyNullableRecords(MemoryRecords.EMPTY);
+        messageData.setMyNullableIntArray(List.of(1, 2, 3));
+
+        nonFlexibleStruct.set("my_nullable_string", "nullable string")
+            .set("my_nullable_bytes", ByteBuffer.wrap("nullable 
bytes".getBytes()))
+            .set("my_nullable_records", MemoryRecords.EMPTY)
+            .set("my_nullable_int_array", new Object[] {1, 2, 3});
+
+        checkSchemaAndMessageRoundTripConsistency((short) 0, messageData, 
nonFlexibleStruct);
+    }
+
+    @Test
+    public void testFlexibleWithNullDefault() {
+        messageData.setMyCompactBytes("compact bytes".getBytes());
+        messageData.setMyCompactRecords(MemoryRecords.EMPTY);
+        messageData.setMyCommonStruct(null);
+
+        flexibleStruct.set("my_common_struct", null);
+
+        checkSchemaAndMessageRoundTripConsistency((short) 1, messageData, 
flexibleStruct);
+    }
+
+    @Test
+    public void testFlexibleWithNonNullValue() {
+        messageData.setMyCompactBytes("compact bytes".getBytes());
+        messageData.setMyCompactRecords(MemoryRecords.EMPTY);
+        messageData.setMyCompactNullableString("compact nullable string");
+        messageData.setMyCompactNullableBytes("compact nullable 
bytes".getBytes());
+        messageData.setMyCompactNullableRecords(MemoryRecords.EMPTY);
+        messageData.setMyNullableIntArray(List.of(1, 2, 3));
+
+        flexibleStruct.set("my_compact_nullable_string", "compact nullable 
string")
+            .set("my_compact_nullable_bytes", ByteBuffer.wrap("compact 
nullable bytes".getBytes()))
+            .set("my_compact_nullable_records", MemoryRecords.EMPTY)
+            .set("my_nullable_int_array", new Object[] {1, 2, 3});
+
+        checkSchemaAndMessageRoundTripConsistency((short) 1, messageData, 
flexibleStruct);
+    }
+
+    private void checkSchemaAndMessageRoundTripConsistency(short version, 
AllTypeMessageData message, Struct struct) {
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        ByteBuffer buf = ByteBuffer.allocate(message.size(cache, version));
+        ByteBufferAccessor serializedMessageAccessor = new 
ByteBufferAccessor(buf);
+        // Serialize message
+        message.write(serializedMessageAccessor, cache, version);
+
+        ByteBuffer serializedSchemaBuffer = 
ByteBuffer.allocate(struct.sizeOf());
+        // Serialize schema
+        struct.writeTo(serializedSchemaBuffer);
+
+        assertEquals(message.size(cache, version), 
serializedMessageAccessor.buffer().position(),
+            "Buffer should be completely filled to message size.");
+        assertEquals(struct.sizeOf(), serializedSchemaBuffer.position(),
+            "Buffer should be completely filled to struct size.");
+        assertEquals(serializedSchemaBuffer.position(), 
serializedMessageAccessor.buffer().position(),
+            "Generated and non-generated schema serializer should serialize to 
the same length.");
+        assertEquals(serializedSchemaBuffer, 
serializedMessageAccessor.buffer(),
+            "Generated and non-generated schema serializer should serialize to 
the same content.");
+
+        serializedMessageAccessor.flip();
+        // Deserialize message
+        Schema schema = version == 0 ? AllTypeMessageData.SCHEMA_0 : 
AllTypeMessageData.SCHEMA_1;
+        Struct deserializedStruct = 
schema.read(serializedMessageAccessor.buffer());
+        assertEquals(struct, deserializedStruct, "Deserialized struct should 
match original struct after round trip");
+
+        serializedSchemaBuffer.flip();
+        // Deserialize schema
+        AllTypeMessageData deserializedMessage = new AllTypeMessageData(new 
ByteBufferAccessor(serializedSchemaBuffer), version);
+        assertEquals(message, deserializedMessage, "Deserialized message 
should match original message after round trip");
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 32ba528fe3e..de4e58f5d96 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -140,9 +140,9 @@ public class ProtocolSerializationTest {
         check(new CompactArrayOf(Type.COMPACT_STRING),
                 new Object[] {"hello", "there", "beautiful"},
                 "COMPACT_ARRAY(COMPACT_STRING)");
-        check(ArrayOf.nullable(Type.STRING), null, "ARRAY(STRING)");
+        check(ArrayOf.nullable(Type.STRING), null, "NULLABLE_ARRAY(STRING)");
         check(CompactArrayOf.nullable(Type.COMPACT_STRING), null,
-                "COMPACT_ARRAY(COMPACT_STRING)");
+                "COMPACT_NULLABLE_ARRAY(COMPACT_STRING)");
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java 
b/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
index 5f2a027321c..7055503298f 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
@@ -32,21 +32,21 @@ public class TypeTest {
     @Test
     public void testEmptyRecordsSerde() {
         ByteBuffer buffer = ByteBuffer.allocate(4);
-        Type.RECORDS.write(buffer, MemoryRecords.EMPTY);
+        Type.NULLABLE_RECORDS.write(buffer, MemoryRecords.EMPTY);
         buffer.flip();
-        assertEquals(4, Type.RECORDS.sizeOf(MemoryRecords.EMPTY));
+        assertEquals(4, Type.NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
         assertEquals(4, buffer.limit());
-        assertEquals(MemoryRecords.EMPTY, Type.RECORDS.read(buffer));
+        assertEquals(MemoryRecords.EMPTY, Type.NULLABLE_RECORDS.read(buffer));
     }
 
     @Test
     public void testNullRecordsSerde() {
         ByteBuffer buffer = ByteBuffer.allocate(4);
-        Type.RECORDS.write(buffer, null);
+        Type.NULLABLE_RECORDS.write(buffer, null);
         buffer.flip();
-        assertEquals(4, Type.RECORDS.sizeOf(MemoryRecords.EMPTY));
+        assertEquals(4, Type.NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
         assertEquals(4, buffer.limit());
-        assertNull(Type.RECORDS.read(buffer));
+        assertNull(Type.NULLABLE_RECORDS.read(buffer));
     }
 
     @Test
@@ -63,21 +63,21 @@ public class TypeTest {
     @Test
     public void testEmptyCompactRecordsSerde() {
         ByteBuffer buffer = ByteBuffer.allocate(4);
-        Type.COMPACT_RECORDS.write(buffer, MemoryRecords.EMPTY);
+        Type.COMPACT_NULLABLE_RECORDS.write(buffer, MemoryRecords.EMPTY);
         buffer.flip();
-        assertEquals(1, Type.COMPACT_RECORDS.sizeOf(MemoryRecords.EMPTY));
+        assertEquals(1, 
Type.COMPACT_NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
         assertEquals(1, buffer.limit());
-        assertEquals(MemoryRecords.EMPTY, Type.COMPACT_RECORDS.read(buffer));
+        assertEquals(MemoryRecords.EMPTY, 
Type.COMPACT_NULLABLE_RECORDS.read(buffer));
     }
 
     @Test
     public void testNullCompactRecordsSerde() {
         ByteBuffer buffer = ByteBuffer.allocate(4);
-        Type.COMPACT_RECORDS.write(buffer, null);
+        Type.COMPACT_NULLABLE_RECORDS.write(buffer, null);
         buffer.flip();
-        assertEquals(1, Type.COMPACT_RECORDS.sizeOf(MemoryRecords.EMPTY));
+        assertEquals(1, 
Type.COMPACT_NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
         assertEquals(1, buffer.limit());
-        assertNull(Type.COMPACT_RECORDS.read(buffer));
+        assertNull(Type.COMPACT_NULLABLE_RECORDS.read(buffer));
     }
 
     @Test
diff --git a/clients/src/test/resources/common/message/AllTypeMessage.json 
b/clients/src/test/resources/common/message/AllTypeMessage.json
new file mode 100644
index 00000000000..faadfd0fb79
--- /dev/null
+++ b/clients/src/test/resources/common/message/AllTypeMessage.json
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+{
+  "name": "AllTypeMessage",
+  "type": "header",
+  "validVersions": "0-1",
+  "flexibleVersions": "1+",
+  "fields": [
+    { "name": "myBoolean", "type": "bool", "versions": "0+", "default": false 
},
+    { "name": "myInt8", "type": "int8", "versions": "0+", "default": "12" },
+    { "name": "myInt16", "type": "int16", "versions": "0+", "default": "123" },
+    { "name": "myUint16", "type": "uint16", "versions": "0+", "default": 
"33000" },
+    { "name": "myInt32", "type": "int32", "versions": "0+", "default": "1234" 
},
+    { "name": "myUint32", "type": "uint32", "versions": "0+", "default": 
"1234567" },
+    { "name": "myUint64", "type": "int64", "versions": "0+", "default": 
"0xcafcacafcacafca" },
+    { "name": "myUuid", "type": "uuid", "versions": "0+", "default": 
"H3KKO4NTRPaCWtEmm3vW7A" },
+    { "name": "myFloat64", "type": "float64", "versions": "0+", "default": 
"12.34" },
+    { "name": "myString", "type": "string", "versions": "0", "default": 
"string" },
+    { "name": "myNullableString", "type": "string", "versions": "0", 
"nullableVersions": "0", "default": "null" },
+    { "name": "myCompactString", "type": "string", "versions": "1", "default": 
"compact string" },
+    { "name": "myCompactNullableString", "type": "string", "versions": "1", 
"nullableVersions": "1", "default": "null" },
+    { "name": "myBytes", "type": "bytes", "versions": "0" },
+    { "name": "myNullableBytes", "type": "bytes", "versions": "0", 
"nullableVersions": "0", "default": "null" },
+    { "name": "myCompactBytes", "type": "bytes", "versions": "1" },
+    { "name": "myCompactNullableBytes", "type": "bytes", "versions": "1", 
"nullableVersions": "1", "default": "null" },
+    { "name": "myRecords", "type": "records", "versions": "0" },
+    { "name": "myNullableRecords", "type": "records", "versions": "0", 
"nullableVersions": "0" },
+    { "name": "myCompactRecords", "type": "records", "versions": "1" },
+    { "name": "myCompactNullableRecords", "type": "records", "versions": "1", 
"nullableVersions": "1" },
+    { "name": "myIntArray", "type": "[]int32", "versions": "0+" },
+    { "name": "myNullableIntArray", "type": "[]int32", "versions": "0+", 
"nullableVersions": "0+", "default": "null" },
+    { "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+", 
"nullableVersions": "1+" }
+  ],
+  "commonStructs": [
+    { "name": "TestCommonStruct", "versions": "0+", "fields": [
+      { "name": "foo", "type": "int32", "default": "123", "versions": "0+" },
+      { "name": "bar", "type": "int32", "default": "123", "versions": "0+" }
+    ]}
+  ]
+}
\ No newline at end of file
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index e92a13288e1..438c1fd7c00 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -103,6 +103,8 @@ public final class MessageGenerator {
 
     static final String SCHEMA_CLASS = 
"org.apache.kafka.common.protocol.types.Schema";
 
+    static final String NULLABLE_SCHEMA_CLASS = 
"org.apache.kafka.common.protocol.types.NullableSchema";
+
     static final String ARRAYOF_CLASS = 
"org.apache.kafka.common.protocol.types.ArrayOf";
 
     static final String COMPACT_ARRAYOF_CLASS = 
"org.apache.kafka.common.protocol.types.CompactArrayOf";
diff --git 
a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
index b1d4861015c..efb4cfb757e 100644
--- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
@@ -90,12 +90,10 @@ final class SchemaGenerator {
         }
 
         // Generate schemas for inline structures
-        generateSchemas(message.dataClassName(), message.struct(),
-            message.struct().versions());
+        generateSchemas(message.dataClassName(), message.struct(), 
message.struct().versions());
     }
 
-    void generateSchemas(String className, StructSpec struct,
-                         Versions parentVersions) {
+    void generateSchemas(String className, StructSpec struct, Versions 
parentVersions) {
         Versions versions = parentVersions.intersect(struct.versions());
         MessageInfo messageInfo = messages.get(className);
         if (messageInfo != null) {
@@ -297,9 +295,9 @@ final class SchemaGenerator {
         } else if (type.isRecords()) {
             headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
             if (fieldFlexibleVersions.contains(version)) {
-                return "Type.COMPACT_RECORDS";
+                return nullable ? "Type.COMPACT_NULLABLE_RECORDS" : 
"Type.COMPACT_RECORDS";
             } else {
-                return "Type.RECORDS";
+                return nullable ? "Type.NULLABLE_RECORDS" : "Type.RECORDS";
             }
         } else if (type.isArray()) {
             if (fieldFlexibleVersions.contains(version)) {
@@ -317,8 +315,12 @@ final class SchemaGenerator {
                         fieldTypeToSchemaType(arrayType.elementType(), false, 
version, fieldFlexibleVersions, false));
             }
         } else if (type.isStruct()) {
-            return String.format("%s.SCHEMA_%d", type,
+            if (nullable) {
+                
headerGenerator.addImport(MessageGenerator.NULLABLE_SCHEMA_CLASS);
+            }
+            String schemaType = String.format("%s.SCHEMA_%d", type,
                 floorVersion(type.toString(), version));
+            return nullable ? String.format("new NullableSchema(%s)", 
schemaType) : schemaType;
         } else {
             throw new RuntimeException("Unsupported type " + type);
         }

Reply via email to