This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a51a68589 Flink: Backport: Fix HashKeyGenerator SelectorKey cache 
ignoring writeParallelism and distributionMode (#15762)
8a51a68589 is described below

commit 8a51a685894eace51474f512b46a187566f27202
Author: Hayoung Lee <[email protected]>
AuthorDate: Thu Mar 26 14:35:58 2026 +0900

    Flink: Backport: Fix HashKeyGenerator SelectorKey cache ignoring 
writeParallelism and distributionMode (#15762)
---
 .../flink/sink/dynamic/HashKeyGenerator.java       | 29 +++++++--
 .../flink/sink/dynamic/TestHashKeyGenerator.java   | 68 ++++++++++++++++++++++
 .../flink/sink/dynamic/HashKeyGenerator.java       | 29 +++++++--
 .../flink/sink/dynamic/TestHashKeyGenerator.java   | 68 ++++++++++++++++++++++
 4 files changed, 186 insertions(+), 8 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 5eb31a9f70..fca45bf882 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -87,7 +87,9 @@ class HashKeyGenerator {
             tableSpec != null ? tableSpec.specId() : null,
             dynamicRecord.schema(),
             dynamicRecord.spec(),
-            dynamicRecord.equalityFields());
+            dynamicRecord.equalityFields(),
+            MoreObjects.firstNonNull(dynamicRecord.distributionMode(), 
DistributionMode.NONE),
+            Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));
     KeySelector<RowData, Integer> keySelector =
         keySelectorCache.computeIfAbsent(
             cacheKey,
@@ -322,6 +324,8 @@ class HashKeyGenerator {
     private final Schema schema;
     private final PartitionSpec spec;
     private final Set<String> equalityFields;
+    private final DistributionMode distributionMode;
+    private final int writeParallelism;
 
     SelectorKey(
         String tableName,
@@ -330,7 +334,9 @@ class HashKeyGenerator {
         @Nullable Integer tableSpecId,
         Schema schema,
         PartitionSpec spec,
-        Set<String> equalityFields) {
+        Set<String> equalityFields,
+        DistributionMode distributionMode,
+        int writeParallelism) {
       this.tableName = tableName;
       this.branch = branch;
       this.schemaId = tableSchemaId;
@@ -338,6 +344,8 @@ class HashKeyGenerator {
       this.schema = tableSchemaId == null ? schema : null;
       this.spec = tableSpecId == null ? spec : null;
       this.equalityFields = equalityFields;
+      this.distributionMode = distributionMode;
+      this.writeParallelism = writeParallelism;
     }
 
     @Override
@@ -357,12 +365,23 @@ class HashKeyGenerator {
           && Objects.equals(specId, that.specId)
           && Objects.equals(schema, that.schema)
           && Objects.equals(spec, that.spec)
-          && Objects.equals(equalityFields, that.equalityFields);
+          && Objects.equals(equalityFields, that.equalityFields)
+          && distributionMode == that.distributionMode
+          && writeParallelism == that.writeParallelism;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(tableName, branch, schemaId, specId, schema, spec, 
equalityFields);
+      return Objects.hash(
+          tableName,
+          branch,
+          schemaId,
+          specId,
+          schema,
+          spec,
+          equalityFields,
+          distributionMode,
+          writeParallelism);
     }
 
     @Override
@@ -375,6 +394,8 @@ class HashKeyGenerator {
           .add("schema", schema)
           .add("spec", spec)
           .add("equalityFields", equalityFields)
+          .add("distributionMode", distributionMode)
+          .add("writeParallelism", writeParallelism)
           .toString();
     }
   }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 7c1d3c3d0a..c65f96b12c 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -401,6 +401,74 @@ class TestHashKeyGenerator {
     assertThat(writeKey1).isEqualTo(writeKey3);
   }
 
+  @Test
+  void testCacheMissOnWriteParallelismChange() throws Exception {
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+    Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> 
keySelectorCache =
+        generator.getKeySelectorCache();
+
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+    DynamicRecord record1 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            unpartitioned,
+            DistributionMode.NONE,
+            2);
+    DynamicRecord record2 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            unpartitioned,
+            DistributionMode.NONE,
+            4);
+
+    generator.generateKey(record1);
+    assertThat(keySelectorCache).hasSize(1);
+
+    generator.generateKey(record2);
+    assertThat(keySelectorCache).hasSize(2);
+  }
+
+  @Test
+  void testCacheMissOnDistributionModeChange() throws Exception {
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+    Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> 
keySelectorCache =
+        generator.getKeySelectorCache();
+
+    PartitionSpec partitioned = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+    DynamicRecord record1 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            partitioned,
+            DistributionMode.NONE,
+            2);
+    DynamicRecord record2 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            partitioned,
+            DistributionMode.HASH,
+            2);
+
+    generator.generateKey(record1);
+    assertThat(keySelectorCache).hasSize(1);
+
+    generator.generateKey(record2);
+    assertThat(keySelectorCache).hasSize(2);
+  }
+
   private static int getWriteKey(
       HashKeyGenerator generator,
       PartitionSpec spec,
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 5eb31a9f70..fca45bf882 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -87,7 +87,9 @@ class HashKeyGenerator {
             tableSpec != null ? tableSpec.specId() : null,
             dynamicRecord.schema(),
             dynamicRecord.spec(),
-            dynamicRecord.equalityFields());
+            dynamicRecord.equalityFields(),
+            MoreObjects.firstNonNull(dynamicRecord.distributionMode(), 
DistributionMode.NONE),
+            Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));
     KeySelector<RowData, Integer> keySelector =
         keySelectorCache.computeIfAbsent(
             cacheKey,
@@ -322,6 +324,8 @@ class HashKeyGenerator {
     private final Schema schema;
     private final PartitionSpec spec;
     private final Set<String> equalityFields;
+    private final DistributionMode distributionMode;
+    private final int writeParallelism;
 
     SelectorKey(
         String tableName,
@@ -330,7 +334,9 @@ class HashKeyGenerator {
         @Nullable Integer tableSpecId,
         Schema schema,
         PartitionSpec spec,
-        Set<String> equalityFields) {
+        Set<String> equalityFields,
+        DistributionMode distributionMode,
+        int writeParallelism) {
       this.tableName = tableName;
       this.branch = branch;
       this.schemaId = tableSchemaId;
@@ -338,6 +344,8 @@ class HashKeyGenerator {
       this.schema = tableSchemaId == null ? schema : null;
       this.spec = tableSpecId == null ? spec : null;
       this.equalityFields = equalityFields;
+      this.distributionMode = distributionMode;
+      this.writeParallelism = writeParallelism;
     }
 
     @Override
@@ -357,12 +365,23 @@ class HashKeyGenerator {
           && Objects.equals(specId, that.specId)
           && Objects.equals(schema, that.schema)
           && Objects.equals(spec, that.spec)
-          && Objects.equals(equalityFields, that.equalityFields);
+          && Objects.equals(equalityFields, that.equalityFields)
+          && distributionMode == that.distributionMode
+          && writeParallelism == that.writeParallelism;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(tableName, branch, schemaId, specId, schema, spec, 
equalityFields);
+      return Objects.hash(
+          tableName,
+          branch,
+          schemaId,
+          specId,
+          schema,
+          spec,
+          equalityFields,
+          distributionMode,
+          writeParallelism);
     }
 
     @Override
@@ -375,6 +394,8 @@ class HashKeyGenerator {
           .add("schema", schema)
           .add("spec", spec)
           .add("equalityFields", equalityFields)
+          .add("distributionMode", distributionMode)
+          .add("writeParallelism", writeParallelism)
           .toString();
     }
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 7c1d3c3d0a..c65f96b12c 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -401,6 +401,74 @@ class TestHashKeyGenerator {
     assertThat(writeKey1).isEqualTo(writeKey3);
   }
 
+  @Test
+  void testCacheMissOnWriteParallelismChange() throws Exception {
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+    Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> 
keySelectorCache =
+        generator.getKeySelectorCache();
+
+    PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+    DynamicRecord record1 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            unpartitioned,
+            DistributionMode.NONE,
+            2);
+    DynamicRecord record2 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            unpartitioned,
+            DistributionMode.NONE,
+            4);
+
+    generator.generateKey(record1);
+    assertThat(keySelectorCache).hasSize(1);
+
+    generator.generateKey(record2);
+    assertThat(keySelectorCache).hasSize(2);
+  }
+
+  @Test
+  void testCacheMissOnDistributionModeChange() throws Exception {
+    int maxWriteParallelism = 8;
+    HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+    Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> 
keySelectorCache =
+        generator.getKeySelectorCache();
+
+    PartitionSpec partitioned = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+    DynamicRecord record1 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            partitioned,
+            DistributionMode.NONE,
+            2);
+    DynamicRecord record2 =
+        new DynamicRecord(
+            TABLE_IDENTIFIER,
+            BRANCH,
+            SCHEMA,
+            GenericRowData.of(1, StringData.fromString("foo")),
+            partitioned,
+            DistributionMode.HASH,
+            2);
+
+    generator.generateKey(record1);
+    assertThat(keySelectorCache).hasSize(1);
+
+    generator.generateKey(record2);
+    assertThat(keySelectorCache).hasSize(2);
+  }
+
   private static int getWriteKey(
       HashKeyGenerator generator,
       PartitionSpec spec,

Reply via email to