This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 090b17bf535 added missing schemaFieldNumber annotation (#36489)
090b17bf535 is described below
commit 090b17bf535c7bcafd5072ef7f9d1a5d32a170e2
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Mon Oct 13 16:40:56 2025 +0300
added missing schemaFieldNumber annotation (#36489)
---
.../KafkaReadSchemaTransformConfiguration.java | 5 ++++
.../KafkaReadSchemaTransformProviderTest.java | 27 +++++++++++++++++++++-
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index ae812840fa8..0cf40f9b7eb 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -174,22 +174,27 @@ public abstract class
KafkaReadSchemaTransformConfiguration {
public abstract ErrorHandling getErrorHandling();
@SchemaFieldDescription("If the Kafka read should be redistributed.")
+ @SchemaFieldNumber("12")
@Nullable
public abstract Boolean getRedistributed();
@SchemaFieldDescription("If the Kafka read allows duplicates.")
+ @SchemaFieldNumber("13")
@Nullable
public abstract Boolean getAllowDuplicates();
@SchemaFieldDescription("The number of keys for redistributing Kafka
inputs.")
+ @SchemaFieldNumber("14")
@Nullable
public abstract Integer getRedistributeNumKeys();
@SchemaFieldDescription("If the redistribute is using offset deduplication
mode.")
+ @SchemaFieldNumber("15")
@Nullable
public abstract Boolean getOffsetDeduplication();
@SchemaFieldDescription("If the redistribute keys by the Kafka record key.")
+ @SchemaFieldNumber("16")
@Nullable
public abstract Boolean getRedistributeByRecordKey();
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index 7541eb84216..9d276fa0e55 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -380,7 +380,7 @@ public class KafkaReadSchemaTransformProviderTest {
Schema schema =
SchemaRegistry.createDefault().getSchema(KafkaReadSchemaTransformConfiguration.class);
- assertEquals(12, schema.getFieldCount());
+ assertEquals(17, schema.getFieldCount());
// Check field name, type, and nullability. Descriptions are not checked
as they are not
// critical for serialization.
@@ -453,5 +453,30 @@ public class KafkaReadSchemaTransformProviderTest {
actualRowSchemaForErrorHandling.getField(0).getDescription()))))
.withDescription(schema.getField(11).getDescription()),
schema.getField(11));
+
+ assertEquals(
+ Schema.Field.nullable("redistributed", Schema.FieldType.BOOLEAN)
+ .withDescription(schema.getField(12).getDescription()),
+ schema.getField(12));
+
+ assertEquals(
+ Schema.Field.nullable("allowDuplicates", Schema.FieldType.BOOLEAN)
+ .withDescription(schema.getField(13).getDescription()),
+ schema.getField(13));
+
+ assertEquals(
+ Schema.Field.nullable("redistributeNumKeys", Schema.FieldType.INT32)
+ .withDescription(schema.getField(14).getDescription()),
+ schema.getField(14));
+
+ assertEquals(
+ Schema.Field.nullable("offsetDeduplication", Schema.FieldType.BOOLEAN)
+ .withDescription(schema.getField(15).getDescription()),
+ schema.getField(15));
+
+ assertEquals(
+ Schema.Field.nullable("redistributeByRecordKey",
Schema.FieldType.BOOLEAN)
+ .withDescription(schema.getField(16).getDescription()),
+ schema.getField(16));
}
}