This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 60436605e19 Add schema provider support for Kafka redistribute options
(#36332)
60436605e19 is described below
commit 60436605e1983d9bc34f5be4aec881124e22b13d
Author: Tom Stepp <[email protected]>
AuthorDate: Sun Oct 5 13:51:12 2025 -0700
Add schema provider support for Kafka redistribute options (#36332)
* Add deterministic sharding unit test.
* Refactor to specific deterministic Kafka redistribute method.
* Add redistribute by key variant.
* Actually enable withRedistributeByRecordKey in KafkaIOTest.
* Add byRecordKey property to Kafka read compatibility.
* Rebase and revert method rename for debugging.
* Add schema provider for redistribute options
* Address spotless findings to simplify boolean expressions
* Revert accidental changes from merge conflict resolution
* Refactor into helper method.
---
.../KafkaReadSchemaTransformConfiguration.java | 30 ++++++++++++++++++++++
.../io/kafka/KafkaReadSchemaTransformProvider.java | 29 +++++++++++++++++++++
.../KafkaReadSchemaTransformProviderTest.java | 7 ++++-
3 files changed, 65 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index 47e0b2a9aca..2ac8370099f 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -160,6 +160,26 @@ public abstract class
KafkaReadSchemaTransformConfiguration {
@Nullable
public abstract ErrorHandling getErrorHandling();
+ @SchemaFieldDescription("If the Kafka read should be redistributed.")
+ @Nullable
+ public abstract Boolean getRedistributed();
+
+ @SchemaFieldDescription("If the Kafka read allows duplicates.")
+ @Nullable
+ public abstract Boolean getAllowDuplicates();
+
+ @SchemaFieldDescription("The number of keys for redistributing Kafka
inputs.")
+ @Nullable
+ public abstract Integer getRedistributeNumKeys();
+
+ @SchemaFieldDescription("If the redistribute is using offset deduplication
mode.")
+ @Nullable
+ public abstract Boolean getOffsetDeduplication();
+
+ @SchemaFieldDescription("If the redistribute keys by the Kafka record key.")
+ @Nullable
+ public abstract Boolean getRedistributeByRecordKey();
+
/** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
@@ -190,6 +210,16 @@ public abstract class
KafkaReadSchemaTransformConfiguration {
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+ public abstract Builder setRedistributed(Boolean redistribute);
+
+ public abstract Builder setAllowDuplicates(Boolean allowDuplicates);
+
+ public abstract Builder setRedistributeNumKeys(Integer
redistributeNumKeys);
+
+ public abstract Builder setOffsetDeduplication(Boolean
offsetDeduplication);
+
+ public abstract Builder setRedistributeByRecordKey(Boolean
redistributeByRecordKey);
+
/** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
public abstract KafkaReadSchemaTransformConfiguration build();
}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 57fac43640a..74f9b147bbd 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -166,6 +166,31 @@ public class KafkaReadSchemaTransformProvider
return SchemaRegistryProvider.UNSPECIFIED;
}
+ private static <K, V> KafkaIO.Read<K, V> applyRedistributeSettings(
+ KafkaIO.Read<K, V> kafkaRead, KafkaReadSchemaTransformConfiguration
configuration) {
+ Boolean redistribute = configuration.getRedistributed();
+ if (redistribute != null && redistribute) {
+ kafkaRead = kafkaRead.withRedistribute();
+ }
+ Integer redistributeNumKeys = configuration.getRedistributeNumKeys();
+ if (redistributeNumKeys != null && redistributeNumKeys > 0) {
+ kafkaRead = kafkaRead.withRedistributeNumKeys(redistributeNumKeys);
+ }
+ Boolean allowDuplicates = configuration.getAllowDuplicates();
+ if (allowDuplicates != null) {
+ kafkaRead = kafkaRead.withAllowDuplicates(allowDuplicates);
+ }
+ Boolean redistributeByRecordKey =
configuration.getRedistributeByRecordKey();
+ if (redistributeByRecordKey != null) {
+ kafkaRead =
kafkaRead.withRedistributeByRecordKey(redistributeByRecordKey);
+ }
+ Boolean offsetDeduplication = configuration.getOffsetDeduplication();
+ if (offsetDeduplication != null) {
+ kafkaRead = kafkaRead.withOffsetDeduplication(offsetDeduplication);
+ }
+ return kafkaRead;
+ }
+
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
configuration.validate();
@@ -233,6 +258,8 @@ public class KafkaReadSchemaTransformProvider
kafkaRead =
kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds));
}
+ kafkaRead = applyRedistributeSettings(kafkaRead, configuration);
+
PCollection<GenericRecord> kafkaValues =
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
@@ -283,6 +310,8 @@ public class KafkaReadSchemaTransformProvider
kafkaRead =
kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds));
}
+ kafkaRead = applyRedistributeSettings(kafkaRead, configuration);
+
PCollection<byte[]> kafkaValues =
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index dc97dadf6e9..3c19f85c300 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -130,7 +130,12 @@ public class KafkaReadSchemaTransformProviderTest {
"error_handling",
"file_descriptor_path",
"message_name",
- "max_read_time_seconds"),
+ "max_read_time_seconds",
+ "redistributed",
+ "allow_duplicates",
+ "offset_deduplication",
+ "redistribute_num_keys",
+ "redistribute_by_record_key"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));