This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 575df4e89ee Set default redistribute key limit for KafkaIO read. 
(#36124)
575df4e89ee is described below

commit 575df4e89ee91f0135d03f8e5fa2b9e3fa02a5b7
Author: Tom Stepp <[email protected]>
AuthorDate: Tue Sep 16 19:29:11 2025 -0500

    Set default redistribute key limit for KafkaIO read. (#36124)
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 40 +++++++++++++++---
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 48 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e048a996a8c..045a74a8507 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -655,6 +655,14 @@ public class KafkaIO {
 
   ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
 
+  /**
+   * Default number of keys to redistribute Kafka inputs into.
+   *
+   * <p>This value is used when {@link Read#withRedistribute()} is used 
without {@link
+   * Read#withRedistributeNumKeys(int redistributeNumKeys)}.
+   */
+  private static final int DEFAULT_REDISTRIBUTE_NUM_KEYS = 32768;
+
   /**
    * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for 
more information on
    * usage and configuration.
@@ -1099,7 +1107,11 @@ public class KafkaIO {
      * @return an updated {@link Read} transform.
      */
     public Read<K, V> withRedistribute() {
-      return toBuilder().setRedistributed(true).build();
+      Builder<K, V> builder = toBuilder().setRedistributed(true);
+      if (getRedistributeNumKeys() == 0) {
+        builder = 
builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
+      }
+      return builder.build();
     }
 
     /**
@@ -1121,10 +1133,11 @@ public class KafkaIO {
      * Redistributes Kafka messages into a distinct number of keys for 
processing in subsequent
      * steps.
      *
-     * <p>Specifying an explicit number of keys is generally recommended over 
redistributing into an
-     * unbounded key space.
+     * <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
      *
-     * <p>Must be used with {@link KafkaIO#withRedistribute()}.
+     * <p>Use zero to disable bucketing into a distinct number of keys.
+     *
+     * <p>Must be used with {@link Read#withRedistribute()}.
      *
      * @param redistributeNumKeys specifies the total number of keys for 
redistributing inputs.
      * @return an updated {@link Read} transform.
@@ -2667,13 +2680,30 @@ public class KafkaIO {
 
     /** Enable Redistribute. */
     public ReadSourceDescriptors<K, V> withRedistribute() {
-      return toBuilder().setRedistribute(true).build();
+      Builder<K, V> builder = toBuilder().setRedistribute(true);
+      if (getRedistributeNumKeys() == 0) {
+        builder = 
builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
+      }
+      return builder.build();
     }
 
     public ReadSourceDescriptors<K, V> withAllowDuplicates() {
       return toBuilder().setAllowDuplicates(true).build();
     }
 
+    /**
+     * Redistributes Kafka messages into a distinct number of keys for 
processing in subsequent
+     * steps.
+     *
+     * <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
+     *
+     * <p>Use zero to disable bucketing into a distinct number of keys.
+     *
+     * <p>Must be used with {@link ReadSourceDescriptors#withRedistribute()}.
+     *
+     * @param redistributeNumKeys specifies the total number of keys for 
redistributing inputs.
+     * @return an updated {@link Read} transform.
+     */
     public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int 
redistributeNumKeys) {
       return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
     }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3d441f8dc52..83c2e1b3882 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.matchesPattern;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -792,6 +793,53 @@ public class KafkaIOTest {
     p.run();
   }
 
+  @Test
+  public void testDefaultRedistributeNumKeys() {
+    int numElements = 1000;
+    // Redistribute is not used and does not modify the read transform further.
+    KafkaIO.Read<Integer, Long> read =
+        mkKafkaReadTransform(
+            numElements,
+            numElements,
+            new ValueAsTimestampFn(),
+            false, /*redistribute*/
+            false, /*allowDuplicates*/
+            null, /*numKeys*/
+            null, /*offsetDeduplication*/
+            null /*topics*/);
+    assertFalse(read.isRedistributed());
+    assertEquals(0, read.getRedistributeNumKeys());
+
+    // Redistribute is used and defaulted the number of keys due to no user 
setting.
+    read =
+        mkKafkaReadTransform(
+            numElements,
+            numElements,
+            new ValueAsTimestampFn(),
+            true, /*redistribute*/
+            false, /*allowDuplicates*/
+            null, /*numKeys*/
+            null, /*offsetDeduplication*/
+            null /*topics*/);
+    assertTrue(read.isRedistributed());
+    // Default is defined by DEFAULT_REDISTRIBUTE_NUM_KEYS in KafkaIO.
+    assertEquals(32768, read.getRedistributeNumKeys());
+
+    // Redistribute is set with user-specified the number of keys.
+    read =
+        mkKafkaReadTransform(
+            numElements,
+            numElements,
+            new ValueAsTimestampFn(),
+            true, /*redistribute*/
+            false, /*allowDuplicates*/
+            10, /*numKeys*/
+            null, /*offsetDeduplication*/
+            null /*topics*/);
+    assertTrue(read.isRedistributed());
+    assertEquals(10, read.getRedistributeNumKeys());
+  }
+
   @Test
   public void testDisableRedistributeKafkaOffsetLegacy() {
     thrown.expect(Exception.class);

Reply via email to