This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new a5f6f449b3 Flink: Fix flaky tests for Iceberg sink and improve assert description (#14044) a5f6f449b3 is described below commit a5f6f449b31dc5cf3da2d8e5e192250b2008f1a5 Author: Maximilian Michels <m...@apache.org> AuthorDate: Thu Sep 11 11:23:27 2025 +0200 Flink: Fix flaky tests for Iceberg sink and improve assert description (#14044) --- .../src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java | 6 +++++- .../org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java | 4 ++-- .../test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 0071abfd9a..d9c9f7ad3f 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -323,7 +323,11 @@ public class SimpleDataUtil { Snapshot snapshot = latestSnapshot(table, branch); if (snapshot == null) { - assertThat(expected).isEmpty(); + assertThat(expected) + .as( + "No snapshot for table '%s', assuming expected data is empty. If that's not the case, the Flink job most likely did not checkpoint.", + table.name()) + .isEmpty(); return; } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java index f3d735982f..018b877a01 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java @@ -134,7 +134,7 @@ public class TestFlinkIcebergSinkExtended extends TestFlinkIcebergSinkBase { List<Row> leftRows = createRows("left-"); DataStream<Row> leftStream = - env.fromCollection(leftRows, ROW_TYPE_INFO) + env.addSource(createBoundedSource(leftRows), ROW_TYPE_INFO) .name("leftCustomSource") .uid("leftCustomSource"); if (isTableSchema) { @@ -157,7 +157,7 @@ public class TestFlinkIcebergSinkExtended extends TestFlinkIcebergSinkBase { List<Row> rightRows = createRows("right-"); DataStream<Row> rightStream = - env.fromCollection(rightRows, ROW_TYPE_INFO) + env.addSource(createBoundedSource(rightRows), ROW_TYPE_INFO) .name("rightCustomSource") .uid("rightCustomSource"); if (isTableSchema) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index dc9b9db45f..4f2b09ee55 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -236,7 +236,7 @@ public class TestIcebergSink extends TestFlinkIcebergSinkBase { List<Row> leftRows = createRows("left-"); DataStream<Row> leftStream = - env.fromCollection(leftRows, ROW_TYPE_INFO) + env.addSource(createBoundedSource(leftRows), ROW_TYPE_INFO) .name("leftCustomSource") .uid("leftCustomSource"); @@ -260,7 +260,7 @@ public class TestIcebergSink extends TestFlinkIcebergSinkBase { List<Row> rightRows = createRows("right-"); DataStream<Row> rightStream = - env.fromCollection(rightRows, ROW_TYPE_INFO) + env.addSource(createBoundedSource(rightRows), ROW_TYPE_INFO) .name("rightCustomSource") .uid("rightCustomSource");