This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d581e79d39 Flink 1.16: Use Awaitility instead of Thread.sleep() (#8880)
d581e79d39 is described below

commit d581e79d39b3d2c00e8b70f9921be823bd6434f5
Author: Naveen Kumar <[email protected]>
AuthorDate: Thu Oct 19 20:23:41 2023 +0530

    Flink 1.16: Use Awaitility instead of Thread.sleep() (#8880)
---
 .../org/apache/iceberg/flink/SimpleDataUtil.java   | 25 ++++++--------
 .../flink/source/TestIcebergSourceContinuous.java  | 12 ++++---
 .../flink/source/TestIcebergSourceFailover.java    | 13 +++-----
 .../flink/source/TestStreamingMonitorFunction.java | 39 ++++++++++------------
 4 files changed, 41 insertions(+), 48 deletions(-)

diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 5efb7413e7..f48764f772 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -70,6 +70,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
 import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 
 public class SimpleDataUtil {
@@ -277,21 +278,17 @@ public class SimpleDataUtil {
   }
 
   /**
-   * Assert table contains the expected list of records after waiting up to 
{@code maxCheckCount}
-   * with {@code checkInterval}
+   * Assert table contains the expected list of records after waiting up to 
the configured {@code
+   * timeout}
    */
-  public static void assertTableRecords(
-      Table table, List<Record> expected, Duration checkInterval, int 
maxCheckCount)
-      throws IOException, InterruptedException {
-    for (int i = 0; i < maxCheckCount; ++i) {
-      if (equalsRecords(expected, tableRecords(table), table.schema())) {
-        break;
-      } else {
-        Thread.sleep(checkInterval.toMillis());
-      }
-    }
-    // success or failure, assert on the latest table state
-    assertRecordsEqual(expected, tableRecords(table), table.schema());
+  public static void assertTableRecords(Table table, List<Record> expected, 
Duration timeout) {
+    Awaitility.await("expected list of records should be produced")
+        .atMost(timeout)
+        .untilAsserted(
+            () -> {
+              equalsRecords(expected, tableRecords(table), table.schema());
+              assertRecordsEqual(expected, tableRecords(table), 
table.schema());
+            });
   }
 
   public static void assertTableRecords(Table table, List<Record> expected) 
throws IOException {
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 6d26f933b3..31e9733fcd 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
@@ -47,6 +49,7 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.flink.data.RowDataToRowMapper;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -401,10 +404,11 @@ public class TestIcebergSourceContinuous {
     return results;
   }
 
-  public static void waitUntilJobIsRunning(ClusterClient<?> client) throws 
Exception {
-    while (getRunningJobs(client).isEmpty()) {
-      Thread.sleep(10);
-    }
+  public static void waitUntilJobIsRunning(ClusterClient<?> client) {
+    Awaitility.await("job should be running")
+        .atMost(Duration.ofSeconds(30))
+        .pollInterval(Duration.ofMillis(10))
+        .untilAsserted(() -> assertThat(getRunningJobs(client)).isNotEmpty());
   }
 
   public static List<JobID> getRunningJobs(ClusterClient<?> client) throws 
Exception {
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index cad1fa67ae..7186db21f7 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;
+
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -39,13 +41,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.HadoopTableResource;
-import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.sink.FlinkSink;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
@@ -98,11 +98,6 @@ public class TestIcebergSourceFailover {
     return RandomGenericData.generate(schema(), numRecords, seed);
   }
 
-  protected void assertRecords(
-      Table table, List<Record> expectedRecords, Duration interval, int 
maxCount) throws Exception {
-    SimpleDataUtil.assertTableRecords(table, expectedRecords, interval, 
maxCount);
-  }
-
   @Test
   public void testBoundedWithTaskManagerFailover() throws Exception {
     testBoundedIcebergSource(FailoverType.TM);
@@ -156,7 +151,7 @@ public class TestIcebergSourceFailover {
         RecordCounterToFail::continueProcessing,
         miniClusterResource.getMiniCluster());
 
-    assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofMillis(10), 12000);
+    assertTableRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
   @Test
@@ -219,7 +214,7 @@ public class TestIcebergSourceFailover {
 
     // wait longer for continuous source to reduce flakiness
     // because CI servers tend to be overloaded.
-    assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofMillis(10), 12000);
+    assertTableRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
   // ------------------------------------------------------------------------
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index a161645979..65e2b1864f 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -47,6 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -111,14 +113,11 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, function);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      awaitExpectedSplits(sourceContext);
 
       // Stop the stream task.
       function.close();
 
-      Assert.assertEquals("Should produce the expected splits", 1, 
sourceContext.splits.size());
       TestHelpers.assertRecords(
           sourceContext.toRows(), 
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
     }
@@ -148,14 +147,11 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, function);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      awaitExpectedSplits(sourceContext);
 
       // Stop the stream task.
       function.close();
 
-      Assert.assertEquals("Should produce the expected splits", 1, 
sourceContext.splits.size());
       TestHelpers.assertRecords(
           sourceContext.toRows(), 
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
     }
@@ -184,14 +180,11 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, function);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      awaitExpectedSplits(sourceContext);
 
       // Stop the stream task.
       function.close();
 
-      Assert.assertEquals("Should produce the expected splits", 1, 
sourceContext.splits.size());
       TestHelpers.assertRecords(
           sourceContext.toRows(), 
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
     }
@@ -212,16 +205,13 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, func);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      awaitExpectedSplits(sourceContext);
 
       state = harness.snapshot(1, 1);
 
       // Stop the stream task.
       func.close();
 
-      Assert.assertEquals("Should produce the expected splits", 1, 
sourceContext.splits.size());
       TestHelpers.assertRecords(
           sourceContext.toRows(), 
Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
     }
@@ -238,19 +228,26 @@ public class TestStreamingMonitorFunction extends 
TableTestBase {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, newFunc);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      awaitExpectedSplits(sourceContext);
 
       // Stop the stream task.
       newFunc.close();
 
-      Assert.assertEquals("Should produce the expected splits", 1, 
sourceContext.splits.size());
       TestHelpers.assertRecords(
           sourceContext.toRows(), 
Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA);
     }
   }
 
+  private void awaitExpectedSplits(TestSourceContext sourceContext) {
+    Awaitility.await("expected splits should be produced")
+        .atMost(Duration.ofMillis(WAIT_TIME_MILLIS))
+        .untilAsserted(
+            () -> {
+              assertThat(sourceContext.latch.getCount()).isEqualTo(0);
+              assertThat(sourceContext.splits).as("Should produce the expected 
splits").hasSize(1);
+            });
+  }
+
   @Test
   public void testInvalidMaxPlanningSnapshotCount() {
     ScanContext scanContext1 =

Reply via email to