This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8a51a68589 Flink: Backport: Fix HashKeyGenerator SelectorKey cache
ignoring writeParallelism and distributionMode (#15762)
8a51a68589 is described below
commit 8a51a685894eace51474f512b46a187566f27202
Author: Hayoung Lee <[email protected]>
AuthorDate: Thu Mar 26 14:35:58 2026 +0900
Flink: Backport: Fix HashKeyGenerator SelectorKey cache ignoring
writeParallelism and distributionMode (#15762)
---
.../flink/sink/dynamic/HashKeyGenerator.java | 29 +++++++--
.../flink/sink/dynamic/TestHashKeyGenerator.java | 68 ++++++++++++++++++++++
.../flink/sink/dynamic/HashKeyGenerator.java | 29 +++++++--
.../flink/sink/dynamic/TestHashKeyGenerator.java | 68 ++++++++++++++++++++++
4 files changed, 186 insertions(+), 8 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 5eb31a9f70..fca45bf882 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -87,7 +87,9 @@ class HashKeyGenerator {
tableSpec != null ? tableSpec.specId() : null,
dynamicRecord.schema(),
dynamicRecord.spec(),
- dynamicRecord.equalityFields());
+ dynamicRecord.equalityFields(),
+ MoreObjects.firstNonNull(dynamicRecord.distributionMode(),
DistributionMode.NONE),
+ Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));
KeySelector<RowData, Integer> keySelector =
keySelectorCache.computeIfAbsent(
cacheKey,
@@ -322,6 +324,8 @@ class HashKeyGenerator {
private final Schema schema;
private final PartitionSpec spec;
private final Set<String> equalityFields;
+ private final DistributionMode distributionMode;
+ private final int writeParallelism;
SelectorKey(
String tableName,
@@ -330,7 +334,9 @@ class HashKeyGenerator {
@Nullable Integer tableSpecId,
Schema schema,
PartitionSpec spec,
- Set<String> equalityFields) {
+ Set<String> equalityFields,
+ DistributionMode distributionMode,
+ int writeParallelism) {
this.tableName = tableName;
this.branch = branch;
this.schemaId = tableSchemaId;
@@ -338,6 +344,8 @@ class HashKeyGenerator {
this.schema = tableSchemaId == null ? schema : null;
this.spec = tableSpecId == null ? spec : null;
this.equalityFields = equalityFields;
+ this.distributionMode = distributionMode;
+ this.writeParallelism = writeParallelism;
}
@Override
@@ -357,12 +365,23 @@ class HashKeyGenerator {
&& Objects.equals(specId, that.specId)
&& Objects.equals(schema, that.schema)
&& Objects.equals(spec, that.spec)
- && Objects.equals(equalityFields, that.equalityFields);
+ && Objects.equals(equalityFields, that.equalityFields)
+ && distributionMode == that.distributionMode
+ && writeParallelism == that.writeParallelism;
}
@Override
public int hashCode() {
- return Objects.hash(tableName, branch, schemaId, specId, schema, spec,
equalityFields);
+ return Objects.hash(
+ tableName,
+ branch,
+ schemaId,
+ specId,
+ schema,
+ spec,
+ equalityFields,
+ distributionMode,
+ writeParallelism);
}
@Override
@@ -375,6 +394,8 @@ class HashKeyGenerator {
.add("schema", schema)
.add("spec", spec)
.add("equalityFields", equalityFields)
+ .add("distributionMode", distributionMode)
+ .add("writeParallelism", writeParallelism)
.toString();
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 7c1d3c3d0a..c65f96b12c 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -401,6 +401,74 @@ class TestHashKeyGenerator {
assertThat(writeKey1).isEqualTo(writeKey3);
}
+ @Test
+ void testCacheMissOnWriteParallelismChange() throws Exception {
+ int maxWriteParallelism = 8;
+ HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+ Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>>
keySelectorCache =
+ generator.getKeySelectorCache();
+
+ PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+ DynamicRecord record1 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ unpartitioned,
+ DistributionMode.NONE,
+ 2);
+ DynamicRecord record2 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ unpartitioned,
+ DistributionMode.NONE,
+ 4);
+
+ generator.generateKey(record1);
+ assertThat(keySelectorCache).hasSize(1);
+
+ generator.generateKey(record2);
+ assertThat(keySelectorCache).hasSize(2);
+ }
+
+ @Test
+ void testCacheMissOnDistributionModeChange() throws Exception {
+ int maxWriteParallelism = 8;
+ HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+ Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>>
keySelectorCache =
+ generator.getKeySelectorCache();
+
+ PartitionSpec partitioned =
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+ DynamicRecord record1 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ partitioned,
+ DistributionMode.NONE,
+ 2);
+ DynamicRecord record2 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ partitioned,
+ DistributionMode.HASH,
+ 2);
+
+ generator.generateKey(record1);
+ assertThat(keySelectorCache).hasSize(1);
+
+ generator.generateKey(record2);
+ assertThat(keySelectorCache).hasSize(2);
+ }
+
private static int getWriteKey(
HashKeyGenerator generator,
PartitionSpec spec,
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
index 5eb31a9f70..fca45bf882 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java
@@ -87,7 +87,9 @@ class HashKeyGenerator {
tableSpec != null ? tableSpec.specId() : null,
dynamicRecord.schema(),
dynamicRecord.spec(),
- dynamicRecord.equalityFields());
+ dynamicRecord.equalityFields(),
+ MoreObjects.firstNonNull(dynamicRecord.distributionMode(),
DistributionMode.NONE),
+ Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));
KeySelector<RowData, Integer> keySelector =
keySelectorCache.computeIfAbsent(
cacheKey,
@@ -322,6 +324,8 @@ class HashKeyGenerator {
private final Schema schema;
private final PartitionSpec spec;
private final Set<String> equalityFields;
+ private final DistributionMode distributionMode;
+ private final int writeParallelism;
SelectorKey(
String tableName,
@@ -330,7 +334,9 @@ class HashKeyGenerator {
@Nullable Integer tableSpecId,
Schema schema,
PartitionSpec spec,
- Set<String> equalityFields) {
+ Set<String> equalityFields,
+ DistributionMode distributionMode,
+ int writeParallelism) {
this.tableName = tableName;
this.branch = branch;
this.schemaId = tableSchemaId;
@@ -338,6 +344,8 @@ class HashKeyGenerator {
this.schema = tableSchemaId == null ? schema : null;
this.spec = tableSpecId == null ? spec : null;
this.equalityFields = equalityFields;
+ this.distributionMode = distributionMode;
+ this.writeParallelism = writeParallelism;
}
@Override
@@ -357,12 +365,23 @@ class HashKeyGenerator {
&& Objects.equals(specId, that.specId)
&& Objects.equals(schema, that.schema)
&& Objects.equals(spec, that.spec)
- && Objects.equals(equalityFields, that.equalityFields);
+ && Objects.equals(equalityFields, that.equalityFields)
+ && distributionMode == that.distributionMode
+ && writeParallelism == that.writeParallelism;
}
@Override
public int hashCode() {
- return Objects.hash(tableName, branch, schemaId, specId, schema, spec,
equalityFields);
+ return Objects.hash(
+ tableName,
+ branch,
+ schemaId,
+ specId,
+ schema,
+ spec,
+ equalityFields,
+ distributionMode,
+ writeParallelism);
}
@Override
@@ -375,6 +394,8 @@ class HashKeyGenerator {
.add("schema", schema)
.add("spec", spec)
.add("equalityFields", equalityFields)
+ .add("distributionMode", distributionMode)
+ .add("writeParallelism", writeParallelism)
.toString();
}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
index 7c1d3c3d0a..c65f96b12c 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java
@@ -401,6 +401,74 @@ class TestHashKeyGenerator {
assertThat(writeKey1).isEqualTo(writeKey3);
}
+ @Test
+ void testCacheMissOnWriteParallelismChange() throws Exception {
+ int maxWriteParallelism = 8;
+ HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+ Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>>
keySelectorCache =
+ generator.getKeySelectorCache();
+
+ PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
+ DynamicRecord record1 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ unpartitioned,
+ DistributionMode.NONE,
+ 2);
+ DynamicRecord record2 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ unpartitioned,
+ DistributionMode.NONE,
+ 4);
+
+ generator.generateKey(record1);
+ assertThat(keySelectorCache).hasSize(1);
+
+ generator.generateKey(record2);
+ assertThat(keySelectorCache).hasSize(2);
+ }
+
+ @Test
+ void testCacheMissOnDistributionModeChange() throws Exception {
+ int maxWriteParallelism = 8;
+ HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
+ Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>>
keySelectorCache =
+ generator.getKeySelectorCache();
+
+ PartitionSpec partitioned =
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+ DynamicRecord record1 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ partitioned,
+ DistributionMode.NONE,
+ 2);
+ DynamicRecord record2 =
+ new DynamicRecord(
+ TABLE_IDENTIFIER,
+ BRANCH,
+ SCHEMA,
+ GenericRowData.of(1, StringData.fromString("foo")),
+ partitioned,
+ DistributionMode.HASH,
+ 2);
+
+ generator.generateKey(record1);
+ assertThat(keySelectorCache).hasSize(1);
+
+ generator.generateKey(record2);
+ assertThat(keySelectorCache).hasSize(2);
+ }
+
private static int getWriteKey(
HashKeyGenerator generator,
PartitionSpec spec,