This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 10a856e0f4 Flink: Proper backport for #8852 (#9146)
10a856e0f4 is described below

commit 10a856e0f4e6bd1653ed66a5ba232277834b9a8d
Author: pvary <[email protected]>
AuthorDate: Fri Nov 24 17:57:04 2023 +0100

    Flink: Proper backport for #8852 (#9146)
---
 .../iceberg/flink/source/TestIcebergSourceFailover.java  |  9 +++++----
 .../flink/source/TestStreamingMonitorFunction.java       | 16 ++++++----------
 .../iceberg/flink/source/TestIcebergSourceFailover.java  |  9 +++++----
 .../flink/source/TestStreamingMonitorFunction.java       | 15 +++++----------
 4 files changed, 21 insertions(+), 28 deletions(-)

diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 7186db21f7..70e7a79d83 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iceberg.flink.source;
 
-import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;
-
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -46,6 +44,7 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.sink.FlinkSink;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
@@ -151,7 +150,8 @@ public class TestIcebergSourceFailover {
         RecordCounterToFail::continueProcessing,
         miniClusterResource.getMiniCluster());
 
-    assertTableRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
+    SimpleDataUtil.assertTableRecords(
+        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
   }
 
   @Test
@@ -214,7 +214,8 @@ public class TestIcebergSourceFailover {
 
     // wait longer for continuous source to reduce flakiness
     // because CI servers tend to be overloaded.
-    assertTableRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
+    SimpleDataUtil.assertTableRecords(
+        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
   }
 
   // ------------------------------------------------------------------------
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 132bbd7b2a..8af1dd883f 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -109,8 +109,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, function);
 
       awaitExpectedSplits(sourceContext);
@@ -143,8 +142,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, function);
 
       awaitExpectedSplits(sourceContext);
@@ -176,11 +174,11 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, function);
 
       awaitExpectedSplits(sourceContext);
+
       // Stop the stream task.
       function.close();
 
@@ -200,8 +198,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, func);
 
       awaitExpectedSplits(sourceContext);
@@ -223,8 +220,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.initializeState(state);
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, newFunc);
 
       awaitExpectedSplits(sourceContext);
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 7186db21f7..70e7a79d83 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iceberg.flink.source;
 
-import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;
-
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -46,6 +44,7 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.sink.FlinkSink;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
@@ -151,7 +150,8 @@ public class TestIcebergSourceFailover {
         RecordCounterToFail::continueProcessing,
         miniClusterResource.getMiniCluster());
 
-    assertTableRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
+    SimpleDataUtil.assertTableRecords(
+        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
   }
 
   @Test
@@ -214,7 +214,8 @@ public class TestIcebergSourceFailover {
 
     // wait longer for continuous source to reduce flakiness
     // because CI servers tend to be overloaded.
-    assertTableRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
+    SimpleDataUtil.assertTableRecords(
+        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
   }
 
   // ------------------------------------------------------------------------
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 0c3f54cc72..6d1891baf5 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -109,8 +109,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, function);
 
       awaitExpectedSplits(sourceContext);
@@ -143,8 +142,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, function);
 
       awaitExpectedSplits(sourceContext);
@@ -176,8 +174,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, function);
 
       awaitExpectedSplits(sourceContext);
@@ -201,8 +198,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.setup();
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, func);
 
       awaitExpectedSplits(sourceContext);
@@ -224,8 +220,7 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       harness.initializeState(state);
       harness.open();
 
-      CountDownLatch latch = new CountDownLatch(1);
-      TestSourceContext sourceContext = new TestSourceContext(latch);
+      TestSourceContext sourceContext = new TestSourceContext(new 
CountDownLatch(1));
       runSourceFunctionInTask(sourceContext, newFunc);
 
       awaitExpectedSplits(sourceContext);

Reply via email to