This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 42cac92c48 Flink: Fix equalityFieldColumns always null in IcebergSink 
(#14952)
42cac92c48 is described below

commit 42cac92c4847407b8b56f976b83a0602d80dce0b
Author: GuoYu <[email protected]>
AuthorDate: Tue Jan 6 20:26:31 2026 +0800

    Flink: Fix equalityFieldColumns always null in IcebergSink (#14952)
---
 .../org/apache/iceberg/flink/sink/IcebergSink.java | 15 ++++++++--
 .../TestFlinkIcebergSinkV2DistributionMode.java    | 33 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 593230262f..470bbc41de 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -89,6 +89,7 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.slf4j.Logger;
@@ -162,7 +163,9 @@ public class IcebergSink
   private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
 
   private final Table table;
-  private final Set<String> equalityFieldColumns = null;
+  // This should only be used for logging/error messages. For any actual logic 
always use
+  // equalityFieldIds instead.
+  private final Set<String> equalityFieldColumns;
 
   private IcebergSink(
       TableLoader tableLoader,
@@ -176,7 +179,8 @@ public class IcebergSink
       Set<Integer> equalityFieldIds,
       String branch,
       boolean overwriteMode,
-      FlinkMaintenanceConfig flinkMaintenanceConfig) {
+      FlinkMaintenanceConfig flinkMaintenanceConfig,
+      Set<String> equalityFieldColumns) {
     this.tableLoader = tableLoader;
     this.snapshotProperties = snapshotProperties;
     this.uidSuffix = uidSuffix;
@@ -198,6 +202,7 @@ public class IcebergSink
     this.sinkId = UUID.randomUUID().toString();
     this.compactMode = flinkWriteConf.compactMode();
     this.flinkMaintenanceConfig = flinkMaintenanceConfig;
+    this.equalityFieldColumns = equalityFieldColumns;
   }
 
   @Override
@@ -666,6 +671,9 @@ public class IcebergSink
       FlinkMaintenanceConfig flinkMaintenanceConfig =
           new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
 
+      Set<String> equalityFieldColumnsSet =
+          equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns) 
: null;
+
       return new IcebergSink(
           tableLoader,
           table,
@@ -680,7 +688,8 @@ public class IcebergSink
           equalityFieldIds,
           flinkWriteConf.branch(),
           overwriteMode,
-          flinkMaintenanceConfig);
+          flinkMaintenanceConfig,
+          equalityFieldColumnsSet);
     }
 
     /**
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
index 0feb4cc282..89f2c7b0da 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java
@@ -562,6 +562,39 @@ public class TestFlinkIcebergSinkV2DistributionMode 
extends TestFlinkIcebergSink
     }
   }
 
+  @TestTemplate
+  public void testHashDistributionWithPartitionNotInEqualityFields() {
+    assumeThat(partitioned).isTrue();
+
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
+
+    if (isTableSchema) {
+      IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_TABLE_SCHEMA)
+          .table(table)
+          .tableLoader(tableLoader)
+          .writeParallelism(writeParallelism)
+          .distributionMode(DistributionMode.HASH)
+          .upsert(false)
+          .equalityFieldColumns(ImmutableList.of("id"))
+          .append();
+    } else {
+      IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+          .table(table)
+          .tableLoader(tableLoader)
+          .writeParallelism(writeParallelism)
+          .distributionMode(DistributionMode.HASH)
+          .upsert(false)
+          .equalityFieldColumns(ImmutableList.of("id"))
+          .append();
+    }
+
+    assertThatThrownBy(env::execute)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining(
+            "In 'hash' distribution mode with equality fields set, source 
column 'data' of partition field '1000: data: identity(2)' should be included 
in equality fields: '[id]'");
+  }
+
   private BoundedTestSource<Row> createRangeDistributionBoundedSource(
       List<List<Row>> rowsPerCheckpoint) {
     return new BoundedTestSource<>(rowsPerCheckpoint);

Reply via email to