This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 10a856e0f4 Flink: Proper backport for #8852 (#9146)
10a856e0f4 is described below
commit 10a856e0f4e6bd1653ed66a5ba232277834b9a8d
Author: pvary <[email protected]>
AuthorDate: Fri Nov 24 17:57:04 2023 +0100
Flink: Proper backport for #8852 (#9146)
---
.../iceberg/flink/source/TestIcebergSourceFailover.java | 9 +++++----
.../flink/source/TestStreamingMonitorFunction.java | 16 ++++++----------
.../iceberg/flink/source/TestIcebergSourceFailover.java | 9 +++++----
.../flink/source/TestStreamingMonitorFunction.java | 15 +++++----------
4 files changed, 21 insertions(+), 28 deletions(-)
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 7186db21f7..70e7a79d83 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.source;
-import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;
-
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -46,6 +44,7 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
@@ -151,7 +150,8 @@ public class TestIcebergSourceFailover {
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());
- assertTableRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
+ SimpleDataUtil.assertTableRecords(
+ sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}
@Test
@@ -214,7 +214,8 @@ public class TestIcebergSourceFailover {
// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
- assertTableRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
+ SimpleDataUtil.assertTableRecords(
+ sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}
// ------------------------------------------------------------------------
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 132bbd7b2a..8af1dd883f 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -109,8 +109,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);
awaitExpectedSplits(sourceContext);
@@ -143,8 +142,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);
awaitExpectedSplits(sourceContext);
@@ -176,11 +174,11 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);
awaitExpectedSplits(sourceContext);
+
// Stop the stream task.
function.close();
@@ -200,8 +198,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, func);
awaitExpectedSplits(sourceContext);
@@ -223,8 +220,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.initializeState(state);
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, newFunc);
awaitExpectedSplits(sourceContext);
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 7186db21f7..70e7a79d83 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.source;
-import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;
-
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -46,6 +44,7 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
@@ -151,7 +150,8 @@ public class TestIcebergSourceFailover {
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());
- assertTableRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
+ SimpleDataUtil.assertTableRecords(
+ sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}
@Test
@@ -214,7 +214,8 @@ public class TestIcebergSourceFailover {
// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
- assertTableRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
+ SimpleDataUtil.assertTableRecords(
+ sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}
// ------------------------------------------------------------------------
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 0c3f54cc72..6d1891baf5 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -109,8 +109,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);
awaitExpectedSplits(sourceContext);
@@ -143,8 +142,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);
awaitExpectedSplits(sourceContext);
@@ -176,8 +174,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);
awaitExpectedSplits(sourceContext);
@@ -201,8 +198,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.setup();
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, func);
awaitExpectedSplits(sourceContext);
@@ -224,8 +220,7 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
harness.initializeState(state);
harness.open();
- CountDownLatch latch = new CountDownLatch(1);
- TestSourceContext sourceContext = new TestSourceContext(latch);
+ TestSourceContext sourceContext = new TestSourceContext(new
CountDownLatch(1));
runSourceFunctionInTask(sourceContext, newFunc);
awaitExpectedSplits(sourceContext);