This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 86e53a7e83 Flink: Backport: Dynamic Sink: Document writeParallelism 
and fail on invalid configuration (#14758)
86e53a7e83 is described below

commit 86e53a7e83d1604b25535cf8ce34c39e8fd4fa1f
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Dec 4 11:25:41 2025 +0100

    Flink: Backport: Dynamic Sink: Document writeParallelism and fail on 
invalid configuration (#14758)
    
    backports #14191
---
 .../iceberg/flink/sink/dynamic/DynamicRecord.java  | 14 +++++++++++
 .../flink/sink/dynamic/HashKeyGenerator.java       | 22 ++++++++--------
 .../flink/sink/dynamic/TestHashKeyGenerator.java   | 29 ++++++++++++++++++++++
 .../iceberg/flink/sink/dynamic/DynamicRecord.java  | 14 +++++++++++
 .../flink/sink/dynamic/HashKeyGenerator.java       | 22 ++++++++--------
 .../flink/sink/dynamic/TestHashKeyGenerator.java   | 29 ++++++++++++++++++++++
 6 files changed, 110 insertions(+), 20 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
index 600a4d8b95..9f44576608 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
@@ -39,6 +39,20 @@ public class DynamicRecord {
   private boolean upsertMode;
   @Nullable private Set<String> equalityFields;
 
+  /**
+   * Constructs a new DynamicRecord.
+   *
+   * @param tableIdentifier The target table identifier.
+   * @param branch The target table branch.
+   * @param schema The target table schema.
+   * @param rowData The data matching the provided schema.
+   * @param partitionSpec The target table {@link PartitionSpec}.
+   * @param distributionMode The {@link DistributionMode}.
+   * @param writeParallelism The number of parallel writers. Can be set to any 
value {@literal > 0},
+   *     but will always be automatically capped by the maximum write 
parallelism, which is the
+   *     parallelism of the sink. Set to Integer.MAX_VALUE for always using 
the maximum available
+   *     write parallelism.
+   */
   public DynamicRecord(
       TableIdentifier tableIdentifier,
       String branch,
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 91aa4a9171..1c611c46b9 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -99,7 +99,7 @@ class HashKeyGenerator {
                         dynamicRecord.distributionMode(), 
DistributionMode.NONE),
                     MoreObjects.firstNonNull(
                         dynamicRecord.equalityFields(), 
Collections.emptySet()),
-                    dynamicRecord.writeParallelism()));
+                    Math.min(dynamicRecord.writeParallelism(), 
maxWriteParallelism)));
     try {
       return keySelector.getKey(
           overrideRowData != null ? overrideRowData : dynamicRecord.rowData());
@@ -242,15 +242,17 @@ class HashKeyGenerator {
         String tableName,
         int writeParallelism,
         int maxWriteParallelism) {
-      if (writeParallelism > maxWriteParallelism) {
-        LOG.warn(
-            "{}: writeParallelism {} is greater than maxWriteParallelism {}. 
Capping writeParallelism at {}",
-            tableName,
-            writeParallelism,
-            maxWriteParallelism,
-            maxWriteParallelism);
-        writeParallelism = maxWriteParallelism;
-      }
+      Preconditions.checkArgument(
+          writeParallelism > 0,
+          "%s: writeParallelism must be > 0 (is: %s)",
+          tableName,
+          writeParallelism);
+      Preconditions.checkArgument(
+          writeParallelism <= maxWriteParallelism,
+          "%s: writeParallelism (%s) must be <= maxWriteParallelism (%s)",
+          tableName,
+          writeParallelism,
+          maxWriteParallelism);
       this.wrapped = wrapped;
       this.writeParallelism = writeParallelism;
       this.distinctKeys = new int[writeParallelism];
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 8d559e9206..04246bf039 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.Collections;
 import java.util.Map;
@@ -157,6 +158,34 @@ class TestHashKeyGenerator {
     assertThat(getSubTaskId(writeKey3, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
   }
 
+  @Test
+  void testFailOnNonPositiveWriteParallelism() {
+    final int maxWriteParallelism = 5;
+    HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+
+    assertThatThrownBy(
+        () -> {
+          getWriteKey(
+              generator,
+              PartitionSpec.unpartitioned(),
+              DistributionMode.NONE,
+              -1, // writeParallelism
+              Collections.emptySet(),
+              GenericRowData.of());
+        });
+
+    assertThatThrownBy(
+        () -> {
+          getWriteKey(
+              generator,
+              PartitionSpec.unpartitioned(),
+              DistributionMode.NONE,
+              0, // writeParallelism
+              Collections.emptySet(),
+              GenericRowData.of());
+        });
+  }
+
   @Test
   void testCapAtMaxWriteParallelism() throws Exception {
     int writeParallelism = 10;
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
index 600a4d8b95..9f44576608 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java
@@ -39,6 +39,20 @@ public class DynamicRecord {
   private boolean upsertMode;
   @Nullable private Set<String> equalityFields;
 
+  /**
+   * Constructs a new DynamicRecord.
+   *
+   * @param tableIdentifier The target table identifier.
+   * @param branch The target table branch.
+   * @param schema The target table schema.
+   * @param rowData The data matching the provided schema.
+   * @param partitionSpec The target table {@link PartitionSpec}.
+   * @param distributionMode The {@link DistributionMode}.
+   * @param writeParallelism The number of parallel writers. Can be set to any 
value {@literal > 0},
+   *     but will always be automatically capped by the maximum write 
parallelism, which is the
+   *     parallelism of the sink. Set to Integer.MAX_VALUE for always using 
the maximum available
+   *     write parallelism.
+   */
   public DynamicRecord(
       TableIdentifier tableIdentifier,
       String branch,
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 91aa4a9171..1c611c46b9 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -99,7 +99,7 @@ class HashKeyGenerator {
                         dynamicRecord.distributionMode(), 
DistributionMode.NONE),
                     MoreObjects.firstNonNull(
                         dynamicRecord.equalityFields(), 
Collections.emptySet()),
-                    dynamicRecord.writeParallelism()));
+                    Math.min(dynamicRecord.writeParallelism(), 
maxWriteParallelism)));
     try {
       return keySelector.getKey(
           overrideRowData != null ? overrideRowData : dynamicRecord.rowData());
@@ -242,15 +242,17 @@ class HashKeyGenerator {
         String tableName,
         int writeParallelism,
         int maxWriteParallelism) {
-      if (writeParallelism > maxWriteParallelism) {
-        LOG.warn(
-            "{}: writeParallelism {} is greater than maxWriteParallelism {}. 
Capping writeParallelism at {}",
-            tableName,
-            writeParallelism,
-            maxWriteParallelism,
-            maxWriteParallelism);
-        writeParallelism = maxWriteParallelism;
-      }
+      Preconditions.checkArgument(
+          writeParallelism > 0,
+          "%s: writeParallelism must be > 0 (is: %s)",
+          tableName,
+          writeParallelism);
+      Preconditions.checkArgument(
+          writeParallelism <= maxWriteParallelism,
+          "%s: writeParallelism (%s) must be <= maxWriteParallelism (%s)",
+          tableName,
+          writeParallelism,
+          maxWriteParallelism);
       this.wrapped = wrapped;
       this.writeParallelism = writeParallelism;
       this.distinctKeys = new int[writeParallelism];
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 8d559e9206..04246bf039 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.Collections;
 import java.util.Map;
@@ -157,6 +158,34 @@ class TestHashKeyGenerator {
     assertThat(getSubTaskId(writeKey3, writeParallelism, 
maxWriteParallelism)).isEqualTo(0);
   }
 
+  @Test
+  void testFailOnNonPositiveWriteParallelism() {
+    final int maxWriteParallelism = 5;
+    HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism);
+
+    assertThatThrownBy(
+        () -> {
+          getWriteKey(
+              generator,
+              PartitionSpec.unpartitioned(),
+              DistributionMode.NONE,
+              -1, // writeParallelism
+              Collections.emptySet(),
+              GenericRowData.of());
+        });
+
+    assertThatThrownBy(
+        () -> {
+          getWriteKey(
+              generator,
+              PartitionSpec.unpartitioned(),
+              DistributionMode.NONE,
+              0, // writeParallelism
+              Collections.emptySet(),
+              GenericRowData.of());
+        });
+  }
+
   @Test
   void testCapAtMaxWriteParallelism() throws Exception {
     int writeParallelism = 10;

Reply via email to