This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58d62d15227 KAFKA-19634 Formalize nullable and non-nullable type
distinctions in protocol specification (#20614)
58d62d15227 is described below
commit 58d62d152275e29cb74a84e925624e1e9a681c8d
Author: Lan Ding <[email protected]>
AuthorDate: Fri Nov 28 21:50:26 2025 +0800
KAFKA-19634 Formalize nullable and non-nullable type distinctions in
protocol specification (#20614)
This patch introduces a clear separation between nullable and
non-nullable data structures. The key changes include:
1. Differentiates between nullable and non-nullable versions of
`RECORDS`, `COMPACT_RECORDS`, and `Schema` types.
2. Adds explicit nullable type names for `ArrayOf` and `CompactArrayOf`.
3. Introduces a new, concise syntax for representing types:
- `{}` for struct, `?{}` for nullable struct
- `[T]` for array, `?[T]` for nullable array
- `(T)` for compact array, `?(T)` for nullable compact array
4. Declares shared schemas as non-nullable `Schema` by default. A field
that references a shared schema and is nullable must be explicitly
declared as a new `NullableSchema(X)`.
5. Add UTs to verify the consistency between schema and message
serialization.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../org/apache/kafka/common/protocol/ApiKeys.java | 13 +-
.../org/apache/kafka/common/protocol/Protocol.java | 16 +-
.../kafka/common/protocol/types/ArrayOf.java | 33 +++-
.../kafka/common/protocol/types/BoundField.java | 2 +-
.../common/protocol/types/CompactArrayOf.java | 32 +++-
.../common/protocol/types/NullableSchema.java | 103 ++++++++++++
.../apache/kafka/common/protocol/types/Schema.java | 34 +++-
.../apache/kafka/common/protocol/types/Type.java | 160 +++++++++++++++---
.../src/main/resources/common/message/README.md | 6 +-
.../message/ProtocolRoundTripConsistencyTest.java | 180 +++++++++++++++++++++
.../protocol/types/ProtocolSerializationTest.java | 4 +-
.../kafka/common/protocol/types/TypeTest.java | 24 +--
.../resources/common/message/AllTypeMessage.json | 52 ++++++
.../org/apache/kafka/message/MessageGenerator.java | 2 +
.../org/apache/kafka/message/SchemaGenerator.java | 16 +-
15 files changed, 609 insertions(+), 68 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 89b952e6ce7..79b283b4f8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -35,7 +35,10 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.COMPACT_BYTES;
import static
org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_BYTES;
+import static
org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_RECORDS;
+import static org.apache.kafka.common.protocol.types.Type.COMPACT_RECORDS;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_RECORDS;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
/**
@@ -135,7 +138,6 @@ public enum ApiKeys {
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
-
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>>
APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
@@ -342,9 +344,14 @@ public enum ApiKeys {
Schema.Visitor detector = new Schema.Visitor() {
@Override
public void visit(Type field) {
- if (field == BYTES || field == NULLABLE_BYTES || field ==
RECORDS ||
- field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES)
+ // avoid BooleanExpressionComplexity checkstyle warning
+ boolean isBytesType = field == BYTES || field ==
NULLABLE_BYTES ||
+ field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES;
+ boolean isRecordsType = field == RECORDS || field ==
NULLABLE_RECORDS ||
+ field == COMPACT_RECORDS || field ==
COMPACT_NULLABLE_RECORDS;
+ if (isBytesType || isRecordsType) {
hasBuffer.set(true);
+ }
}
};
schema.walk(detector);
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 1b051d58bf0..e9d7609403c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -40,13 +40,16 @@ public class Protocol {
final String indentStr = indentString(indentSize);
final Map<String, Type> subTypes = new LinkedHashMap<>();
+ b.append(schema.leftBracket());
+ b.append(" ");
// Top level fields
for (BoundField field: schema.fields()) {
Type type = field.def.type;
if (type.isArray()) {
- b.append("[");
+ b.append(type.leftBracket());
b.append(field.def.name);
- b.append("] ");
+ b.append(type.rightBracket());
+ b.append(" ");
if (!subTypes.containsKey(field.def.name)) {
subTypes.put(field.def.name,
type.arrayElementType().get());
}
@@ -54,9 +57,9 @@ public class Protocol {
Map<Integer, Field> taggedFields = new
TreeMap<>(((TaggedFields) type).fields());
taggedFields.forEach((tag, taggedField) -> {
if (taggedField.type.isArray()) {
- b.append("[");
+ b.append(type.leftBracket());
b.append(taggedField.name);
- b.append("]");
+ b.append(type.rightBracket());
if (!subTypes.containsKey(taggedField.name))
subTypes.put(taggedField.name + "<tag: " +
tag.toString() + ">", taggedField.type.arrayElementType().get());
} else {
@@ -75,6 +78,7 @@ public class Protocol {
subTypes.put(field.def.name, type);
}
}
+ b.append(schema.rightBracket());
b.append("\n");
// Sub Types/Schemas
@@ -227,14 +231,14 @@ public class Protocol {
b.append(" Response (Version: ");
b.append(version);
b.append(") => ");
- schemaToBnfHtml(responses[version], b, 2);
+ schemaToBnfHtml(schema, b, 2);
b.append("</pre>");
b.append("<p><b>Response header version:</b> ");
b.append(key.responseHeaderVersion((short) version));
b.append("</p>\n");
- schemaToFieldTableHtml(responses[version], b);
+ schemaToFieldTableHtml(schema, b);
b.append("</div>\n");
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index 3333084ef66..17c827744b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -28,6 +28,8 @@ public class ArrayOf extends DocumentedType {
private static final String ARRAY_TYPE_NAME = "ARRAY";
+ private static final String NULLABLE_ARRAY_TYPE_NAME = "NULLABLE_ARRAY";
+
private final Type type;
private final boolean nullable;
@@ -97,9 +99,20 @@ public class ArrayOf extends DocumentedType {
return Optional.of(type);
}
+ @Override
+ public String leftBracket() {
+ return nullable ? "?[" : "[";
+ }
+
+ @Override
+ public String rightBracket() {
+ return "]";
+ }
+
@Override
public String toString() {
- return ARRAY_TYPE_NAME + "(" + type + ")";
+ String name = nullable ? NULLABLE_ARRAY_TYPE_NAME : ARRAY_TYPE_NAME;
+ return name + "(" + type + ")";
}
@Override
@@ -119,15 +132,27 @@ public class ArrayOf extends DocumentedType {
@Override
public String typeName() {
- return ARRAY_TYPE_NAME;
+ return nullable ? NULLABLE_ARRAY_TYPE_NAME : ARRAY_TYPE_NAME;
}
@Override
public String documentation() {
- return "Represents a sequence of objects of a given type T. " +
+ String doc;
+ if (nullable) {
+ doc = "Represents a sequence of objects of a given type T. " +
"Type T can be either a primitive type (e.g. " + STRING + ")
or a structure. " +
"First, the length N is given as an " + INT32 + ". Then N
instances of type T follow. " +
"A null array is represented with a length of -1. " +
- "In protocol documentation an array of T instances is referred
to as [T].";
+ "In protocol documentation a nullable array of T instances is
referred to as " +
+ leftBracket() + "T" + rightBracket() + ".";
+ } else {
+ doc = "Represents a sequence of objects of a given type T. " +
+ "Type T can be either a primitive type (e.g. " + STRING + ")
or a structure. " +
+ "First, the length N is given as an " + INT32 + ". Then N
instances of type T follow. " +
+ "In protocol documentation an array of T instances is referred
to as " +
+ leftBracket() + "T" + rightBracket() + ".";
+ }
+
+ return doc;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
index b031b4fce82..e40931bc7ed 100644
---
a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
+++
b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java
@@ -29,7 +29,7 @@ public class BoundField {
this.schema = schema;
this.index = index;
}
-
+
@Override
public String toString() {
return def.name + ":" + def.type;
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
index 6a252a8a4cc..e41a4c0dc7f 100644
---
a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
+++
b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java
@@ -30,6 +30,8 @@ import java.util.Optional;
public class CompactArrayOf extends DocumentedType {
private static final String COMPACT_ARRAY_TYPE_NAME = "COMPACT_ARRAY";
+ private static final String COMPACT_NULLABLE_ARRAY_TYPE_NAME =
"COMPACT_NULLABLE_ARRAY";
+
private final Type type;
private final boolean nullable;
@@ -103,9 +105,20 @@ public class CompactArrayOf extends DocumentedType {
return Optional.of(type);
}
+ @Override
+ public String leftBracket() {
+ return nullable ? "?(" : "(";
+ }
+
+ @Override
+ public String rightBracket() {
+ return ")";
+ }
+
@Override
public String toString() {
- return COMPACT_ARRAY_TYPE_NAME + "(" + type + ")";
+ String name = nullable ? COMPACT_NULLABLE_ARRAY_TYPE_NAME :
COMPACT_ARRAY_TYPE_NAME;
+ return name + "(" + type + ")";
}
@Override
@@ -125,15 +138,26 @@ public class CompactArrayOf extends DocumentedType {
@Override
public String typeName() {
- return COMPACT_ARRAY_TYPE_NAME;
+ return nullable ? COMPACT_NULLABLE_ARRAY_TYPE_NAME :
COMPACT_ARRAY_TYPE_NAME;
}
@Override
public String documentation() {
- return "Represents a sequence of objects of a given type T. " +
+ String doc;
+ if (nullable) {
+ doc = "Represents a sequence of objects of a given type T. " +
"Type T can be either a primitive type (e.g. " + STRING + ")
or a structure. " +
"First, the length N + 1 is given as an UNSIGNED_VARINT. Then
N instances of type T follow. " +
"A null array is represented with a length of 0. " +
- "In protocol documentation an array of T instances is referred
to as [T].";
+ "In protocol documentation a compact nullable array of T
instances is referred to as " +
+ leftBracket() + "T" + rightBracket() + ".";
+ } else {
+ doc = "Represents a sequence of objects of a given type T. " +
+ "Type T can be either a primitive type (e.g. " + STRING + ")
or a structure. " +
+ "First, the length N + 1 is given as an UNSIGNED_VARINT. Then
N instances of type T follow. " +
+ "In protocol documentation a compact array of T instances is
referred to as " +
+ leftBracket() + "T" + rightBracket() + ".";
+ }
+ return doc;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java
new file mode 100644
index 00000000000..f5011db5421
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * The nullable schema for a compound record definition
+ */
+public final class NullableSchema extends Schema {
+
+ private static final String NULLABLE_STRUCT_TYPE_NAME = "NULLABLE_STRUCT";
+
+ public NullableSchema(Schema schema) {
+ super(schema.tolerateMissingFieldsWithDefaults(),
Arrays.stream(schema.fields()).map(field -> field.def).toArray(Field[]::new));
+ }
+
+ @Override
+ public boolean isNullable() {
+ return true;
+ }
+
+ /**
+ * Write a struct to the buffer with special handling for null values
+ * If the input object is null, writes a byte value of -1 to the buffer as
a null indicator.
+ */
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ if (o == null) {
+ buffer.put((byte) -1);
+ return;
+ }
+
+ buffer.put((byte) 1);
+ super.write(buffer, o);
+ }
+
+ @Override
+ public Struct read(ByteBuffer buffer) {
+ byte nullIndicator = buffer.get();
+ if (nullIndicator < 0)
+ return null;
+
+ return super.read(buffer);
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ if (o == null)
+ return 1;
+
+ return 1 + super.sizeOf(o);
+ }
+
+ @Override
+ public Struct validate(Object item) {
+ if (item == null)
+ return null;
+
+ return super.validate(item);
+ }
+
+ @Override
+ public String typeName() {
+ return NULLABLE_STRUCT_TYPE_NAME;
+ }
+
+ @Override
+ public String leftBracket() {
+ return "?{";
+ }
+
+ @Override
+ public String rightBracket() {
+ return "}";
+ }
+
+ @Override
+ public String documentation() {
+ return "A nullable struct is named by a string with a capitalized
first letter and consists of one or more fields. " +
+ "It represents a composite object or null. " +
+ "For non-null values, the first byte has value 1, " +
+ "followed by the serialization of each field in the order they are
defined. " +
+ "A null value is encoded as a byte with value -1 and there are no
following bytes." +
+ "In protocol documentation a nullable struct containing multiple
fields is enclosed by " +
+ leftBracket() + " and " + rightBracket() + ".";
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 325a77fe43f..9cfc51dd76f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.protocol.types;
+import org.apache.kafka.common.protocol.types.Type.DocumentedType;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -24,7 +26,9 @@ import java.util.Objects;
/**
* The schema for a compound record definition
*/
-public final class Schema extends Type {
+public class Schema extends DocumentedType {
+ private static final String STRUCT_TYPE_NAME = "STRUCT";
+
private static final Object[] NO_VALUES = new Object[0];
private final BoundField[] fields;
@@ -53,6 +57,7 @@ public final class Schema extends Type {
*
* @throws SchemaException If the given list have duplicate fields
*/
+ @SuppressWarnings("this-escape")
public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
this.fields = new BoundField[fs.length];
this.fieldsByName = new HashMap<>();
@@ -173,6 +178,20 @@ public final class Schema extends Type {
return this.fields;
}
+ protected boolean tolerateMissingFieldsWithDefaults() {
+ return this.tolerateMissingFieldsWithDefaults;
+ }
+
+ @Override
+ public String leftBracket() {
+ return "{";
+ }
+
+ @Override
+ public String rightBracket() {
+ return "}";
+ }
+
/**
* Display a string representation of the schema
*/
@@ -206,6 +225,19 @@ public final class Schema extends Type {
}
}
+ @Override
+ public String typeName() {
+ return STRUCT_TYPE_NAME;
+ }
+
+ @Override
+ public String documentation() {
+ return "A struct is named by a string with a capitalized first letter
and consists of one or more fields. " +
+ "It represents a composite object encoded as the serialization of
each field in the order they are defined." +
+ "In protocol documentation a struct containing multiple fields is
enclosed by " +
+ leftBracket() + " and " + rightBracket() + ".";
+ }
+
public void walk(Visitor visitor) {
Objects.requireNonNull(visitor, "visitor must be non-null");
handleNode(this, visitor);
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index f4c0ee5705c..91f578262f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -79,6 +79,20 @@ public abstract class Type {
return arrayElementType().isPresent();
}
+ /**
+ * For annotation in the generated html protocol doc.
+ */
+ public String leftBracket() {
+ return "";
+ }
+
+ /**
+ * For annotation in the generated html protocol doc.
+ */
+ public String rightBracket() {
+ return "";
+ }
+
/**
* A Type that can return its description for documentation purposes.
*/
@@ -774,7 +788,7 @@ public abstract class Type {
@Override
public String documentation() {
return "Represents a raw sequence of bytes. First the length N+1
is given as an UNSIGNED_VARINT." +
- "Then N bytes follow.";
+ " Then N bytes follow.";
}
};
@@ -912,23 +926,17 @@ public abstract class Type {
@Override
public String documentation() {
return "Represents a raw sequence of bytes. First the length N+1
is given as an UNSIGNED_VARINT." +
- "Then N bytes follow. A null object is represented with a
length of 0.";
+ " Then N bytes follow. A null object is represented with a
length of 0.";
}
};
- public static final DocumentedType COMPACT_RECORDS = new DocumentedType() {
- @Override
- public boolean isNullable() {
- return true;
- }
+ public static final DocumentedType RECORDS = new DocumentedType() {
@Override
public void write(ByteBuffer buffer, Object o) {
- if (o == null) {
- COMPACT_NULLABLE_BYTES.write(buffer, null);
- } else if (o instanceof MemoryRecords) {
+ if (o instanceof MemoryRecords) {
MemoryRecords records = (MemoryRecords) o;
- COMPACT_NULLABLE_BYTES.write(buffer,
records.buffer().duplicate());
+ BYTES.write(buffer, records.buffer().duplicate());
} else {
throw new IllegalArgumentException("Unexpected record type: "
+ o.getClass());
}
@@ -936,20 +944,57 @@ public abstract class Type {
@Override
public MemoryRecords read(ByteBuffer buffer) {
- ByteBuffer recordsBuffer = (ByteBuffer)
COMPACT_NULLABLE_BYTES.read(buffer);
- if (recordsBuffer == null) {
- return null;
- } else {
- return MemoryRecords.readableRecords(recordsBuffer);
- }
+ ByteBuffer recordsBuffer = (ByteBuffer) BYTES.read(buffer);
+ return MemoryRecords.readableRecords(recordsBuffer);
}
@Override
public int sizeOf(Object o) {
- if (o == null) {
- return 1;
+ BaseRecords records = (BaseRecords) o;
+ return 4 + records.sizeInBytes();
+ }
+
+ @Override
+ public String typeName() {
+ return "RECORDS";
+ }
+
+ @Override
+ public BaseRecords validate(Object item) {
+ if (item instanceof MemoryRecords)
+ return (BaseRecords) item;
+
+ throw new SchemaException(item + " is not an instance of " +
MemoryRecords.class.getName());
+ }
+
+ @Override
+ public String documentation() {
+ return "Represents a sequence of Kafka records as " + BYTES + ". "
+
+ "For a detailed description of records see " +
+ "<a href=\"/documentation/#messageformat\">Message Sets</a>.";
+ }
+ };
+
+ public static final DocumentedType COMPACT_RECORDS = new DocumentedType() {
+
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ if (o instanceof MemoryRecords) {
+ MemoryRecords records = (MemoryRecords) o;
+ COMPACT_BYTES.write(buffer, records.buffer().duplicate());
+ } else {
+ throw new IllegalArgumentException("Unexpected record type: "
+ o.getClass());
}
+ }
+
+ @Override
+ public MemoryRecords read(ByteBuffer buffer) {
+ ByteBuffer recordsBuffer = (ByteBuffer) COMPACT_BYTES.read(buffer);
+ return MemoryRecords.readableRecords(recordsBuffer);
+ }
+ @Override
+ public int sizeOf(Object o) {
BaseRecords records = (BaseRecords) o;
int recordsSize = records.sizeInBytes();
return ByteUtils.sizeOfUnsignedVarint(recordsSize + 1) +
recordsSize;
@@ -962,9 +1007,6 @@ public abstract class Type {
@Override
public BaseRecords validate(Object item) {
- if (item == null)
- return null;
-
if (item instanceof BaseRecords)
return (BaseRecords) item;
@@ -973,13 +1015,13 @@ public abstract class Type {
@Override
public String documentation() {
- return "Represents a sequence of Kafka records as " +
COMPACT_NULLABLE_BYTES + ". " +
+ return "Represents a sequence of Kafka records as " +
COMPACT_BYTES + ". " +
"For a detailed description of records see " +
"<a href=\"/documentation/#messageformat\">Message Sets</a>.";
}
};
- public static final DocumentedType RECORDS = new DocumentedType() {
+ public static final DocumentedType NULLABLE_RECORDS = new DocumentedType()
{
@Override
public boolean isNullable() {
return true;
@@ -1018,7 +1060,7 @@ public abstract class Type {
@Override
public String typeName() {
- return "RECORDS";
+ return "NULLABLE_RECORDS";
}
@Override
@@ -1040,6 +1082,69 @@ public abstract class Type {
}
};
+ public static final DocumentedType COMPACT_NULLABLE_RECORDS = new
DocumentedType() {
+ @Override
+ public boolean isNullable() {
+ return true;
+ }
+
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ if (o == null) {
+ COMPACT_NULLABLE_BYTES.write(buffer, null);
+ } else if (o instanceof MemoryRecords) {
+ MemoryRecords records = (MemoryRecords) o;
+ COMPACT_NULLABLE_BYTES.write(buffer,
records.buffer().duplicate());
+ } else {
+ throw new IllegalArgumentException("Unexpected record type: "
+ o.getClass());
+ }
+ }
+
+ @Override
+ public MemoryRecords read(ByteBuffer buffer) {
+ ByteBuffer recordsBuffer = (ByteBuffer)
COMPACT_NULLABLE_BYTES.read(buffer);
+ if (recordsBuffer == null) {
+ return null;
+ } else {
+ return MemoryRecords.readableRecords(recordsBuffer);
+ }
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ if (o == null) {
+ return 1;
+ }
+
+ BaseRecords records = (BaseRecords) o;
+ int recordsSize = records.sizeInBytes();
+ return ByteUtils.sizeOfUnsignedVarint(recordsSize + 1) +
recordsSize;
+ }
+
+ @Override
+ public String typeName() {
+ return "COMPACT_NULLABLE_RECORDS";
+ }
+
+ @Override
+ public BaseRecords validate(Object item) {
+ if (item == null)
+ return null;
+
+ if (item instanceof BaseRecords)
+ return (BaseRecords) item;
+
+ throw new SchemaException(item + " is not an instance of " +
BaseRecords.class.getName());
+ }
+
+ @Override
+ public String documentation() {
+ return "Represents a sequence of Kafka records as " +
COMPACT_NULLABLE_BYTES + ". " +
+ "For a detailed description of records see " +
+ "<a href=\"/documentation/#messageformat\">Message Sets</a>.";
+ }
+ };
+
public static final DocumentedType VARINT = new DocumentedType() {
@Override
public void write(ByteBuffer buffer, Object o) {
@@ -1116,7 +1221,10 @@ public abstract class Type {
UINT16, UNSIGNED_INT32, VARINT, VARLONG, UUID, FLOAT64,
STRING, COMPACT_STRING, NULLABLE_STRING, COMPACT_NULLABLE_STRING,
BYTES, COMPACT_BYTES, NULLABLE_BYTES, COMPACT_NULLABLE_BYTES,
- RECORDS, COMPACT_RECORDS, new ArrayOf(STRING), new
CompactArrayOf(COMPACT_STRING)};
+ RECORDS, COMPACT_RECORDS, NULLABLE_RECORDS,
COMPACT_NULLABLE_RECORDS,
+ new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING),
ArrayOf.nullable(STRING), CompactArrayOf.nullable(STRING),
+ new Schema(), new NullableSchema(new Schema())};
+
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
diff --git a/clients/src/main/resources/common/message/README.md
b/clients/src/main/resources/common/message/README.md
index 435635214f2..86c8f350ae3 100644
--- a/clients/src/main/resources/common/message/README.md
+++ b/clients/src/main/resources/common/message/README.md
@@ -91,6 +91,8 @@ There are several primitive field types available.
* "records": recordset such as memory recordset.
+* "struct": a composite object consisting of one or more fields.
+
In addition to these primitive field types, there is also an array type. Array
types start with a "[]" and end with the name of the element type. For
example, []Foo declares an array of "Foo" objects. Array fields have their own
@@ -101,8 +103,8 @@ Guide](https://kafka.apache.org/protocol.html).
Nullable Fields
---------------
-Booleans, ints, and floats can never be null. However, fields that are
strings,
-bytes, uuid, records, or arrays may optionally be "nullable". When a field is
+Booleans, ints, floats and uuid can never be null. However, fields that are
strings,
+bytes, records, struct, or arrays may optionally be "nullable". When a field
is
"nullable", that simply means that we are prepared to serialize and deserialize
null entries for that field.
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/ProtocolRoundTripConsistencyTest.java
b/clients/src/test/java/org/apache/kafka/common/message/ProtocolRoundTripConsistencyTest.java
new file mode 100644
index 00000000000..d86cd2d543d
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/message/ProtocolRoundTripConsistencyTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.message;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ProtocolRoundTripConsistencyTest {
+
+ private Struct nonFlexibleStruct;
+
+ private Struct flexibleStruct;
+
+ private AllTypeMessageData messageData;
+
+ @BeforeEach
+ public void setup() {
+ nonFlexibleStruct = new Struct(AllTypeMessageData.SCHEMA_0)
+ .set("my_boolean", false)
+ .set("my_int8", (byte) 12)
+ .set("my_int16", (short) 123)
+ .set("my_uint16", 33000)
+ .set("my_int32", 1234)
+ .set("my_uint32", 1234567L)
+ .set("my_uint64", 0xcafcacafcacafcaL)
+ .set("my_uuid", Uuid.fromString("H3KKO4NTRPaCWtEmm3vW7A"))
+ .set("my_float64", 12.34D)
+ .set("my_string", "string")
+ .set("my_nullable_string", null)
+ .set("my_bytes", ByteBuffer.wrap("bytes".getBytes()))
+ .set("my_nullable_bytes", null)
+ .set("my_records", MemoryRecords.EMPTY)
+ .set("my_nullable_records", null)
+ .set("my_int_array", new Object[] {})
+ .set("my_nullable_int_array", null);
+ nonFlexibleStruct.set("my_common_struct",
nonFlexibleStruct.instance("my_common_struct")
+ .set("foo", 123)
+ .set("bar", 123));
+
+ flexibleStruct = new Struct(AllTypeMessageData.SCHEMA_1)
+ .set("my_boolean", false)
+ .set("my_int8", (byte) 12)
+ .set("my_int16", (short) 123)
+ .set("my_uint16", 33000)
+ .set("my_int32", 1234)
+ .set("my_uint32", 1234567L)
+ .set("my_uint64", 0xcafcacafcacafcaL)
+ .set("my_uuid", Uuid.fromString("H3KKO4NTRPaCWtEmm3vW7A"))
+ .set("my_float64", 12.34D)
+ .set("my_compact_string", "compact string")
+ .set("my_compact_nullable_string", null)
+ .set("my_compact_bytes", ByteBuffer.wrap("compact
bytes".getBytes()))
+ .set("my_compact_nullable_bytes", null)
+ .set("my_compact_records", MemoryRecords.EMPTY)
+ .set("my_compact_nullable_records", null)
+ .set("my_int_array", new Object[] {})
+ .set("my_nullable_int_array", null)
+ .set("_tagged_fields", new TreeMap<Integer, Field>());
+ flexibleStruct.set("my_common_struct",
flexibleStruct.instance("my_common_struct")
+ .set("foo", 123)
+ .set("bar", 123)
+ .set("_tagged_fields", new TreeMap<Integer, Field>()));
+
+ messageData = new AllTypeMessageData();
+ }
+
+ @Test
+ public void testNonFlexibleWithNullDefault() {
+ messageData.setMyBytes("bytes".getBytes());
+ messageData.setMyRecords(MemoryRecords.EMPTY);
+
+ checkSchemaAndMessageRoundTripConsistency((short) 0, messageData,
nonFlexibleStruct);
+ }
+
+ @Test
+ public void testNonFlexibleWithNonNullValue() {
+ messageData.setMyBytes("bytes".getBytes());
+ messageData.setMyRecords(MemoryRecords.EMPTY);
+ messageData.setMyNullableString("nullable string");
+ messageData.setMyNullableBytes("nullable bytes".getBytes());
+ messageData.setMyNullableRecords(MemoryRecords.EMPTY);
+ messageData.setMyNullableIntArray(List.of(1, 2, 3));
+
+ nonFlexibleStruct.set("my_nullable_string", "nullable string")
+ .set("my_nullable_bytes", ByteBuffer.wrap("nullable
bytes".getBytes()))
+ .set("my_nullable_records", MemoryRecords.EMPTY)
+ .set("my_nullable_int_array", new Object[] {1, 2, 3});
+
+ checkSchemaAndMessageRoundTripConsistency((short) 0, messageData,
nonFlexibleStruct);
+ }
+
+ @Test
+ public void testFlexibleWithNullDefault() {
+ messageData.setMyCompactBytes("compact bytes".getBytes());
+ messageData.setMyCompactRecords(MemoryRecords.EMPTY);
+ messageData.setMyCommonStruct(null);
+
+ flexibleStruct.set("my_common_struct", null);
+
+ checkSchemaAndMessageRoundTripConsistency((short) 1, messageData,
flexibleStruct);
+ }
+
+ @Test
+ public void testFlexibleWithNonNullValue() {
+ messageData.setMyCompactBytes("compact bytes".getBytes());
+ messageData.setMyCompactRecords(MemoryRecords.EMPTY);
+ messageData.setMyCompactNullableString("compact nullable string");
+ messageData.setMyCompactNullableBytes("compact nullable
bytes".getBytes());
+ messageData.setMyCompactNullableRecords(MemoryRecords.EMPTY);
+ messageData.setMyNullableIntArray(List.of(1, 2, 3));
+
+ flexibleStruct.set("my_compact_nullable_string", "compact nullable
string")
+ .set("my_compact_nullable_bytes", ByteBuffer.wrap("compact
nullable bytes".getBytes()))
+ .set("my_compact_nullable_records", MemoryRecords.EMPTY)
+ .set("my_nullable_int_array", new Object[] {1, 2, 3});
+
+ checkSchemaAndMessageRoundTripConsistency((short) 1, messageData,
flexibleStruct);
+ }
+
+ private void checkSchemaAndMessageRoundTripConsistency(short version,
AllTypeMessageData message, Struct struct) {
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ ByteBuffer buf = ByteBuffer.allocate(message.size(cache, version));
+ ByteBufferAccessor serializedMessageAccessor = new
ByteBufferAccessor(buf);
+ // Serialize message
+ message.write(serializedMessageAccessor, cache, version);
+
+ ByteBuffer serializedSchemaBuffer =
ByteBuffer.allocate(struct.sizeOf());
+ // Serialize schema
+ struct.writeTo(serializedSchemaBuffer);
+
+ assertEquals(message.size(cache, version),
serializedMessageAccessor.buffer().position(),
+ "Buffer should be completely filled to message size.");
+ assertEquals(struct.sizeOf(), serializedSchemaBuffer.position(),
+ "Buffer should be completely filled to struct size.");
+ assertEquals(serializedSchemaBuffer.position(),
serializedMessageAccessor.buffer().position(),
+ "Generated and non-generated schema serializer should serialize to
the same length.");
+ assertEquals(serializedSchemaBuffer,
serializedMessageAccessor.buffer(),
+ "Generated and non-generated schema serializer should serialize to
the same content.");
+
+ serializedMessageAccessor.flip();
+ // Deserialize message
+ Schema schema = version == 0 ? AllTypeMessageData.SCHEMA_0 :
AllTypeMessageData.SCHEMA_1;
+ Struct deserializedStruct =
schema.read(serializedMessageAccessor.buffer());
+ assertEquals(struct, deserializedStruct, "Deserialized struct should
match original struct after round trip");
+
+ serializedSchemaBuffer.flip();
+ // Deserialize schema
+ AllTypeMessageData deserializedMessage = new AllTypeMessageData(new
ByteBufferAccessor(serializedSchemaBuffer), version);
+ assertEquals(message, deserializedMessage, "Deserialized message
should match original message after round trip");
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 32ba528fe3e..de4e58f5d96 100644
---
a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -140,9 +140,9 @@ public class ProtocolSerializationTest {
check(new CompactArrayOf(Type.COMPACT_STRING),
new Object[] {"hello", "there", "beautiful"},
"COMPACT_ARRAY(COMPACT_STRING)");
- check(ArrayOf.nullable(Type.STRING), null, "ARRAY(STRING)");
+ check(ArrayOf.nullable(Type.STRING), null, "NULLABLE_ARRAY(STRING)");
check(CompactArrayOf.nullable(Type.COMPACT_STRING), null,
- "COMPACT_ARRAY(COMPACT_STRING)");
+ "COMPACT_NULLABLE_ARRAY(COMPACT_STRING)");
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
index 5f2a027321c..7055503298f 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/TypeTest.java
@@ -32,21 +32,21 @@ public class TypeTest {
@Test
public void testEmptyRecordsSerde() {
ByteBuffer buffer = ByteBuffer.allocate(4);
- Type.RECORDS.write(buffer, MemoryRecords.EMPTY);
+ Type.NULLABLE_RECORDS.write(buffer, MemoryRecords.EMPTY);
buffer.flip();
- assertEquals(4, Type.RECORDS.sizeOf(MemoryRecords.EMPTY));
+ assertEquals(4, Type.NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
assertEquals(4, buffer.limit());
- assertEquals(MemoryRecords.EMPTY, Type.RECORDS.read(buffer));
+ assertEquals(MemoryRecords.EMPTY, Type.NULLABLE_RECORDS.read(buffer));
}
@Test
public void testNullRecordsSerde() {
ByteBuffer buffer = ByteBuffer.allocate(4);
- Type.RECORDS.write(buffer, null);
+ Type.NULLABLE_RECORDS.write(buffer, null);
buffer.flip();
- assertEquals(4, Type.RECORDS.sizeOf(MemoryRecords.EMPTY));
+ assertEquals(4, Type.NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
assertEquals(4, buffer.limit());
- assertNull(Type.RECORDS.read(buffer));
+ assertNull(Type.NULLABLE_RECORDS.read(buffer));
}
@Test
@@ -63,21 +63,21 @@ public class TypeTest {
@Test
public void testEmptyCompactRecordsSerde() {
ByteBuffer buffer = ByteBuffer.allocate(4);
- Type.COMPACT_RECORDS.write(buffer, MemoryRecords.EMPTY);
+ Type.COMPACT_NULLABLE_RECORDS.write(buffer, MemoryRecords.EMPTY);
buffer.flip();
- assertEquals(1, Type.COMPACT_RECORDS.sizeOf(MemoryRecords.EMPTY));
+ assertEquals(1,
Type.COMPACT_NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
assertEquals(1, buffer.limit());
- assertEquals(MemoryRecords.EMPTY, Type.COMPACT_RECORDS.read(buffer));
+ assertEquals(MemoryRecords.EMPTY,
Type.COMPACT_NULLABLE_RECORDS.read(buffer));
}
@Test
public void testNullCompactRecordsSerde() {
ByteBuffer buffer = ByteBuffer.allocate(4);
- Type.COMPACT_RECORDS.write(buffer, null);
+ Type.COMPACT_NULLABLE_RECORDS.write(buffer, null);
buffer.flip();
- assertEquals(1, Type.COMPACT_RECORDS.sizeOf(MemoryRecords.EMPTY));
+ assertEquals(1,
Type.COMPACT_NULLABLE_RECORDS.sizeOf(MemoryRecords.EMPTY));
assertEquals(1, buffer.limit());
- assertNull(Type.COMPACT_RECORDS.read(buffer));
+ assertNull(Type.COMPACT_NULLABLE_RECORDS.read(buffer));
}
@Test
diff --git a/clients/src/test/resources/common/message/AllTypeMessage.json
b/clients/src/test/resources/common/message/AllTypeMessage.json
new file mode 100644
index 00000000000..faadfd0fb79
--- /dev/null
+++ b/clients/src/test/resources/common/message/AllTypeMessage.json
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+{
+ "name": "AllTypeMessage",
+ "type": "header",
+ "validVersions": "0-1",
+ "flexibleVersions": "1+",
+ "fields": [
+ { "name": "myBoolean", "type": "bool", "versions": "0+", "default": false
},
+ { "name": "myInt8", "type": "int8", "versions": "0+", "default": "12" },
+ { "name": "myInt16", "type": "int16", "versions": "0+", "default": "123" },
+ { "name": "myUint16", "type": "uint16", "versions": "0+", "default":
"33000" },
+ { "name": "myInt32", "type": "int32", "versions": "0+", "default": "1234"
},
+ { "name": "myUint32", "type": "uint32", "versions": "0+", "default":
"1234567" },
+ { "name": "myUint64", "type": "int64", "versions": "0+", "default":
"0xcafcacafcacafca" },
+ { "name": "myUuid", "type": "uuid", "versions": "0+", "default":
"H3KKO4NTRPaCWtEmm3vW7A" },
+ { "name": "myFloat64", "type": "float64", "versions": "0+", "default":
"12.34" },
+ { "name": "myString", "type": "string", "versions": "0", "default":
"string" },
+ { "name": "myNullableString", "type": "string", "versions": "0",
"nullableVersions": "0", "default": "null" },
+ { "name": "myCompactString", "type": "string", "versions": "1", "default":
"compact string" },
+ { "name": "myCompactNullableString", "type": "string", "versions": "1",
"nullableVersions": "1", "default": "null" },
+ { "name": "myBytes", "type": "bytes", "versions": "0" },
+ { "name": "myNullableBytes", "type": "bytes", "versions": "0",
"nullableVersions": "0", "default": "null" },
+ { "name": "myCompactBytes", "type": "bytes", "versions": "1" },
+ { "name": "myCompactNullableBytes", "type": "bytes", "versions": "1",
"nullableVersions": "1", "default": "null" },
+ { "name": "myRecords", "type": "records", "versions": "0" },
+ { "name": "myNullableRecords", "type": "records", "versions": "0",
"nullableVersions": "0" },
+ { "name": "myCompactRecords", "type": "records", "versions": "1" },
+ { "name": "myCompactNullableRecords", "type": "records", "versions": "1",
"nullableVersions": "1" },
+ { "name": "myIntArray", "type": "[]int32", "versions": "0+" },
+ { "name": "myNullableIntArray", "type": "[]int32", "versions": "0+",
"nullableVersions": "0+", "default": "null" },
+ { "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+",
"nullableVersions": "1+" }
+ ],
+ "commonStructs": [
+ { "name": "TestCommonStruct", "versions": "0+", "fields": [
+ { "name": "foo", "type": "int32", "default": "123", "versions": "0+" },
+ { "name": "bar", "type": "int32", "default": "123", "versions": "0+" }
+ ]}
+ ]
+}
\ No newline at end of file
diff --git
a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index e92a13288e1..438c1fd7c00 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -103,6 +103,8 @@ public final class MessageGenerator {
static final String SCHEMA_CLASS =
"org.apache.kafka.common.protocol.types.Schema";
+ static final String NULLABLE_SCHEMA_CLASS =
"org.apache.kafka.common.protocol.types.NullableSchema";
+
static final String ARRAYOF_CLASS =
"org.apache.kafka.common.protocol.types.ArrayOf";
static final String COMPACT_ARRAYOF_CLASS =
"org.apache.kafka.common.protocol.types.CompactArrayOf";
diff --git
a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
index b1d4861015c..efb4cfb757e 100644
--- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
@@ -90,12 +90,10 @@ final class SchemaGenerator {
}
// Generate schemas for inline structures
- generateSchemas(message.dataClassName(), message.struct(),
- message.struct().versions());
+ generateSchemas(message.dataClassName(), message.struct(),
message.struct().versions());
}
- void generateSchemas(String className, StructSpec struct,
- Versions parentVersions) {
+ void generateSchemas(String className, StructSpec struct, Versions
parentVersions) {
Versions versions = parentVersions.intersect(struct.versions());
MessageInfo messageInfo = messages.get(className);
if (messageInfo != null) {
@@ -297,9 +295,9 @@ final class SchemaGenerator {
} else if (type.isRecords()) {
headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
if (fieldFlexibleVersions.contains(version)) {
- return "Type.COMPACT_RECORDS";
+ return nullable ? "Type.COMPACT_NULLABLE_RECORDS" :
"Type.COMPACT_RECORDS";
} else {
- return "Type.RECORDS";
+ return nullable ? "Type.NULLABLE_RECORDS" : "Type.RECORDS";
}
} else if (type.isArray()) {
if (fieldFlexibleVersions.contains(version)) {
@@ -317,8 +315,12 @@ final class SchemaGenerator {
fieldTypeToSchemaType(arrayType.elementType(), false,
version, fieldFlexibleVersions, false));
}
} else if (type.isStruct()) {
- return String.format("%s.SCHEMA_%d", type,
+ if (nullable) {
+
headerGenerator.addImport(MessageGenerator.NULLABLE_SCHEMA_CLASS);
+ }
+ String schemaType = String.format("%s.SCHEMA_%d", type,
floorVersion(type.toString(), version));
+ return nullable ? String.format("new NullableSchema(%s)",
schemaType) : schemaType;
} else {
throw new RuntimeException("Unsupported type " + type);
}