This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d581e79d39 Flink 1.16: Use Awaitility instead of Thread.sleep() (#8880)
d581e79d39 is described below
commit d581e79d39b3d2c00e8b70f9921be823bd6434f5
Author: Naveen Kumar <[email protected]>
AuthorDate: Thu Oct 19 20:23:41 2023 +0530
Flink 1.16: Use Awaitility instead of Thread.sleep() (#8880)
---
.../org/apache/iceberg/flink/SimpleDataUtil.java | 25 ++++++--------
.../flink/source/TestIcebergSourceContinuous.java | 12 ++++---
.../flink/source/TestIcebergSourceFailover.java | 13 +++-----
.../flink/source/TestStreamingMonitorFunction.java | 39 ++++++++++------------
4 files changed, 41 insertions(+), 48 deletions(-)
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 5efb7413e7..f48764f772 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -70,6 +70,7 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
import org.junit.Assert;
public class SimpleDataUtil {
@@ -277,21 +278,17 @@ public class SimpleDataUtil {
}
/**
- * Assert table contains the expected list of records after waiting up to
{@code maxCheckCount}
- * with {@code checkInterval}
+ * Assert table contains the expected list of records after waiting up to
the configured {@code
+ * timeout}
*/
- public static void assertTableRecords(
- Table table, List<Record> expected, Duration checkInterval, int
maxCheckCount)
- throws IOException, InterruptedException {
- for (int i = 0; i < maxCheckCount; ++i) {
- if (equalsRecords(expected, tableRecords(table), table.schema())) {
- break;
- } else {
- Thread.sleep(checkInterval.toMillis());
- }
- }
- // success or failure, assert on the latest table state
- assertRecordsEqual(expected, tableRecords(table), table.schema());
+ public static void assertTableRecords(Table table, List<Record> expected,
Duration timeout) {
+ Awaitility.await("expected list of records should be produced")
+ .atMost(timeout)
+ .untilAsserted(
+ () -> {
+ equalsRecords(expected, tableRecords(table), table.schema());
+ assertRecordsEqual(expected, tableRecords(table),
table.schema());
+ });
}
public static void assertTableRecords(Table table, List<Record> expected)
throws IOException {
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 6d26f933b3..31e9733fcd 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.time.Duration;
import java.util.Collection;
import java.util.List;
@@ -47,6 +49,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -401,10 +404,11 @@ public class TestIcebergSourceContinuous {
return results;
}
- public static void waitUntilJobIsRunning(ClusterClient<?> client) throws
Exception {
- while (getRunningJobs(client).isEmpty()) {
- Thread.sleep(10);
- }
+ public static void waitUntilJobIsRunning(ClusterClient<?> client) {
+ Awaitility.await("job should be running")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofMillis(10))
+ .untilAsserted(() -> assertThat(getRunningJobs(client)).isNotEmpty());
}
public static List<JobID> getRunningJobs(ClusterClient<?> client) throws
Exception {
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index cad1fa67ae..7186db21f7 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;
+import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;
+
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -39,13 +41,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
-import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
@@ -98,11 +98,6 @@ public class TestIcebergSourceFailover {
return RandomGenericData.generate(schema(), numRecords, seed);
}
- protected void assertRecords(
- Table table, List<Record> expectedRecords, Duration interval, int
maxCount) throws Exception {
- SimpleDataUtil.assertTableRecords(table, expectedRecords, interval,
maxCount);
- }
-
@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
@@ -156,7 +151,7 @@ public class TestIcebergSourceFailover {
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());
- assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofMillis(10), 12000);
+ assertTableRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
@Test
@@ -219,7 +214,7 @@ public class TestIcebergSourceFailover {
// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
- assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofMillis(10), 12000);
+ assertTableRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
// ------------------------------------------------------------------------
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index a161645979..65e2b1864f 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -18,12 +18,13 @@
*/
package org.apache.iceberg.flink.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
@@ -47,6 +48,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
+import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -111,14 +113,11 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);
- Assert.assertTrue(
- "Should have expected elements.", latch.await(WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS));
- Thread.sleep(1000L);
+ awaitExpectedSplits(sourceContext);
// Stop the stream task.
function.close();
- Assert.assertEquals("Should produce the expected splits", 1,
sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(),
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
@@ -148,14 +147,11 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);
- Assert.assertTrue(
- "Should have expected elements.", latch.await(WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS));
- Thread.sleep(1000L);
+ awaitExpectedSplits(sourceContext);
// Stop the stream task.
function.close();
- Assert.assertEquals("Should produce the expected splits", 1,
sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(),
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
@@ -184,14 +180,11 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);
- Assert.assertTrue(
- "Should have expected elements.", latch.await(WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS));
- Thread.sleep(1000L);
+ awaitExpectedSplits(sourceContext);
// Stop the stream task.
function.close();
- Assert.assertEquals("Should produce the expected splits", 1,
sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(),
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
@@ -212,16 +205,13 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, func);
- Assert.assertTrue(
- "Should have expected elements.", latch.await(WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS));
- Thread.sleep(1000L);
+ awaitExpectedSplits(sourceContext);
state = harness.snapshot(1, 1);
// Stop the stream task.
func.close();
- Assert.assertEquals("Should produce the expected splits", 1,
sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(),
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
@@ -238,19 +228,26 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, newFunc);
- Assert.assertTrue(
- "Should have expected elements.", latch.await(WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS));
- Thread.sleep(1000L);
+ awaitExpectedSplits(sourceContext);
// Stop the stream task.
newFunc.close();
- Assert.assertEquals("Should produce the expected splits", 1,
sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(),
Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA);
}
}
+ private void awaitExpectedSplits(TestSourceContext sourceContext) {
+ Awaitility.await("expected splits should be produced")
+ .atMost(Duration.ofMillis(WAIT_TIME_MILLIS))
+ .untilAsserted(
+ () -> {
+ assertThat(sourceContext.latch.getCount()).isEqualTo(0);
+ assertThat(sourceContext.splits).as("Should produce the expected
splits").hasSize(1);
+ });
+ }
+
@Test
public void testInvalidMaxPlanningSnapshotCount() {
ScanContext scanContext1 =