This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 42cac92c48 Flink: Fix equalityFieldColumns always null in IcebergSink
(#14952)
42cac92c48 is described below
commit 42cac92c4847407b8b56f976b83a0602d80dce0b
Author: GuoYu <[email protected]>
AuthorDate: Tue Jan 6 20:26:31 2026 +0800
Flink: Fix equalityFieldColumns always null in IcebergSink (#14952)
---
.../org/apache/iceberg/flink/sink/IcebergSink.java | 15 ++++++++--
.../TestFlinkIcebergSinkV2DistributionMode.java | 33 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 593230262f..470bbc41de 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -89,6 +89,7 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
@@ -162,7 +163,9 @@ public class IcebergSink
private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
private final Table table;
- private final Set<String> equalityFieldColumns = null;
+ // This should only be used for logging/error messages. For any actual logic
always use
+ // equalityFieldIds instead.
+ private final Set<String> equalityFieldColumns;
private IcebergSink(
TableLoader tableLoader,
@@ -176,7 +179,8 @@ public class IcebergSink
Set<Integer> equalityFieldIds,
String branch,
boolean overwriteMode,
- FlinkMaintenanceConfig flinkMaintenanceConfig) {
+ FlinkMaintenanceConfig flinkMaintenanceConfig,
+ Set<String> equalityFieldColumns) {
this.tableLoader = tableLoader;
this.snapshotProperties = snapshotProperties;
this.uidSuffix = uidSuffix;
@@ -198,6 +202,7 @@ public class IcebergSink
this.sinkId = UUID.randomUUID().toString();
this.compactMode = flinkWriteConf.compactMode();
this.flinkMaintenanceConfig = flinkMaintenanceConfig;
+ this.equalityFieldColumns = equalityFieldColumns;
}
@Override
@@ -666,6 +671,9 @@ public class IcebergSink
FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
+ Set<String> equalityFieldColumnsSet =
+ equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns)
: null;
+
return new IcebergSink(
tableLoader,
table,
@@ -680,7 +688,8 @@ public class IcebergSink
equalityFieldIds,
flinkWriteConf.branch(),
overwriteMode,
- flinkMaintenanceConfig);
+ flinkMaintenanceConfig,
+ equalityFieldColumnsSet);
}
/**
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
index 0feb4cc282..89f2c7b0da 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
@@ -562,6 +562,39 @@ public class TestFlinkIcebergSinkV2DistributionMode
extends TestFlinkIcebergSink
}
}
+ @TestTemplate
+ public void testHashDistributionWithPartitionNotInEqualityFields() {
+ assumeThat(partitioned).isTrue();
+
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
+
+ if (isTableSchema) {
+ IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_TABLE_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(writeParallelism)
+ .distributionMode(DistributionMode.HASH)
+ .upsert(false)
+ .equalityFieldColumns(ImmutableList.of("id"))
+ .append();
+ } else {
+ IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(writeParallelism)
+ .distributionMode(DistributionMode.HASH)
+ .upsert(false)
+ .equalityFieldColumns(ImmutableList.of("id"))
+ .append();
+ }
+
+ assertThatThrownBy(env::execute)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "In 'hash' distribution mode with equality fields set, source
column 'data' of partition field '1000: data: identity(2)' should be included
in equality fields: '[id]'");
+ }
+
private BoundedTestSource<Row> createRangeDistributionBoundedSource(
List<List<Row>> rowsPerCheckpoint) {
return new BoundedTestSource<>(rowsPerCheckpoint);