This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 090b17bf535 added missing schemaFieldNumber annotation (#36489)
090b17bf535 is described below

commit 090b17bf535c7bcafd5072ef7f9d1a5d32a170e2
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Mon Oct 13 16:40:56 2025 +0300

    added missing schemaFieldNumber annotation (#36489)
---
 .../KafkaReadSchemaTransformConfiguration.java     |  5 ++++
 .../KafkaReadSchemaTransformProviderTest.java      | 27 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index ae812840fa8..0cf40f9b7eb 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -174,22 +174,27 @@ public abstract class 
KafkaReadSchemaTransformConfiguration {
   public abstract ErrorHandling getErrorHandling();
 
   @SchemaFieldDescription("If the Kafka read should be redistributed.")
+  @SchemaFieldNumber("12")
   @Nullable
   public abstract Boolean getRedistributed();
 
   @SchemaFieldDescription("If the Kafka read allows duplicates.")
+  @SchemaFieldNumber("13")
   @Nullable
   public abstract Boolean getAllowDuplicates();
 
   @SchemaFieldDescription("The number of keys for redistributing Kafka 
inputs.")
+  @SchemaFieldNumber("14")
   @Nullable
   public abstract Integer getRedistributeNumKeys();
 
   @SchemaFieldDescription("If the redistribute is using offset deduplication 
mode.")
+  @SchemaFieldNumber("15")
   @Nullable
   public abstract Boolean getOffsetDeduplication();
 
   @SchemaFieldDescription("If the redistribute keys by the Kafka record key.")
+  @SchemaFieldNumber("16")
   @Nullable
   public abstract Boolean getRedistributeByRecordKey();
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index 7541eb84216..9d276fa0e55 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -380,7 +380,7 @@ public class KafkaReadSchemaTransformProviderTest {
     Schema schema =
         
SchemaRegistry.createDefault().getSchema(KafkaReadSchemaTransformConfiguration.class);
 
-    assertEquals(12, schema.getFieldCount());
+    assertEquals(17, schema.getFieldCount());
 
     // Check field name, type, and nullability. Descriptions are not checked 
as they are not
     // critical for serialization.
@@ -453,5 +453,30 @@ public class KafkaReadSchemaTransformProviderTest {
                                 
actualRowSchemaForErrorHandling.getField(0).getDescription()))))
             .withDescription(schema.getField(11).getDescription()),
         schema.getField(11));
+
+    assertEquals(
+        Schema.Field.nullable("redistributed", Schema.FieldType.BOOLEAN)
+            .withDescription(schema.getField(12).getDescription()),
+        schema.getField(12));
+
+    assertEquals(
+        Schema.Field.nullable("allowDuplicates", Schema.FieldType.BOOLEAN)
+            .withDescription(schema.getField(13).getDescription()),
+        schema.getField(13));
+
+    assertEquals(
+        Schema.Field.nullable("redistributeNumKeys", Schema.FieldType.INT32)
+            .withDescription(schema.getField(14).getDescription()),
+        schema.getField(14));
+
+    assertEquals(
+        Schema.Field.nullable("offsetDeduplication", Schema.FieldType.BOOLEAN)
+            .withDescription(schema.getField(15).getDescription()),
+        schema.getField(15));
+
+    assertEquals(
+        Schema.Field.nullable("redistributeByRecordKey", 
Schema.FieldType.BOOLEAN)
+            .withDescription(schema.getField(16).getDescription()),
+        schema.getField(16));
   }
 }

Reply via email to