This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 575df4e89ee Set default redistribute key limit for KafkaIO read.
(#36124)
575df4e89ee is described below
commit 575df4e89ee91f0135d03f8e5fa2b9e3fa02a5b7
Author: Tom Stepp <[email protected]>
AuthorDate: Tue Sep 16 19:29:11 2025 -0500
Set default redistribute key limit for KafkaIO read. (#36124)
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 40 +++++++++++++++---
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 48 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e048a996a8c..045a74a8507 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -655,6 +655,14 @@ public class KafkaIO {
///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
+ /**
+ * Default number of keys to redistribute Kafka inputs into.
+ *
+ * <p>This value is used when {@link Read#withRedistribute()} is used
without {@link
+ * Read#withRedistributeNumKeys(int redistributeNumKeys)}.
+ */
+ private static final int DEFAULT_REDISTRIBUTE_NUM_KEYS = 32768;
+
/**
* A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for
more information on
* usage and configuration.
@@ -1099,7 +1107,11 @@ public class KafkaIO {
* @return an updated {@link Read} transform.
*/
public Read<K, V> withRedistribute() {
- return toBuilder().setRedistributed(true).build();
+ Builder<K, V> builder = toBuilder().setRedistributed(true);
+ if (getRedistributeNumKeys() == 0) {
+ builder =
builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
+ }
+ return builder.build();
}
/**
@@ -1121,10 +1133,11 @@ public class KafkaIO {
* Redistributes Kafka messages into a distinct number of keys for
processing in subsequent
* steps.
*
- * <p>Specifying an explicit number of keys is generally recommended over
redistributing into an
- * unbounded key space.
+ * <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
*
- * <p>Must be used with {@link KafkaIO#withRedistribute()}.
+ * <p>Use zero to disable bucketing into a distinct number of keys.
+ *
+ * <p>Must be used with {@link Read#withRedistribute()}.
*
* @param redistributeNumKeys specifies the total number of keys for
redistributing inputs.
* @return an updated {@link Read} transform.
@@ -2667,13 +2680,30 @@ public class KafkaIO {
/** Enable Redistribute. */
public ReadSourceDescriptors<K, V> withRedistribute() {
- return toBuilder().setRedistribute(true).build();
+ Builder<K, V> builder = toBuilder().setRedistribute(true);
+ if (getRedistributeNumKeys() == 0) {
+ builder =
builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
+ }
+ return builder.build();
}
public ReadSourceDescriptors<K, V> withAllowDuplicates() {
return toBuilder().setAllowDuplicates(true).build();
}
+ /**
+ * Redistributes Kafka messages into a distinct number of keys for
processing in subsequent
+ * steps.
+ *
+ * <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
+ *
+ * <p>Use zero to disable bucketing into a distinct number of keys.
+ *
+ * <p>Must be used with {@link ReadSourceDescriptors#withRedistribute()}.
+ *
+ * @param redistributeNumKeys specifies the total number of keys for
redistributing inputs.
+ * @return an updated {@link Read} transform.
+ */
public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int
redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3d441f8dc52..83c2e1b3882 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -792,6 +793,53 @@ public class KafkaIOTest {
p.run();
}
+ @Test
+ public void testDefaultRedistributeNumKeys() {
+ int numElements = 1000;
+ // Redistribute is not used and does not modify the read transform further.
+ KafkaIO.Read<Integer, Long> read =
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ false, /*redistribute*/
+ false, /*allowDuplicates*/
+ null, /*numKeys*/
+ null, /*offsetDeduplication*/
+ null /*topics*/);
+ assertFalse(read.isRedistributed());
+ assertEquals(0, read.getRedistributeNumKeys());
+
+ // Redistribute is used and defaulted the number of keys due to no user
setting.
+ read =
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ true, /*redistribute*/
+ false, /*allowDuplicates*/
+ null, /*numKeys*/
+ null, /*offsetDeduplication*/
+ null /*topics*/);
+ assertTrue(read.isRedistributed());
+ // Default is defined by DEFAULT_REDISTRIBUTE_NUM_KEYS in KafkaIO.
+ assertEquals(32768, read.getRedistributeNumKeys());
+
+ // Redistribute is set with user-specified the number of keys.
+ read =
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ true, /*redistribute*/
+ false, /*allowDuplicates*/
+ 10, /*numKeys*/
+ null, /*offsetDeduplication*/
+ null /*topics*/);
+ assertTrue(read.isRedistributed());
+ assertEquals(10, read.getRedistributeNumKeys());
+ }
+
@Test
public void testDisableRedistributeKafkaOffsetLegacy() {
thrown.expect(Exception.class);