This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6a00e0cea8 [INLONG-9034][Sort] Fix sort redis test with incorrect use
of sleep (#9327)
6a00e0cea8 is described below
commit 6a00e0cea8853f0faa6e99cfaccd1a648c51cea9
Author: Sting <[email protected]>
AuthorDate: Fri Nov 24 10:29:12 2023 +0800
[INLONG-9034][Sort] Fix sort redis test with incorrect use of sleep (#9327)
---
.../sort-flink-v1.13/sort-connectors/redis/pom.xml | 4 ++
.../apache/inlong/sort/redis/RedisTableTest.java | 83 +++++++++++-----------
2 files changed, 44 insertions(+), 43 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
index 81af253242..fa6c4e7c2a 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
@@ -120,6 +120,10 @@
<artifactId>flink-clients_2.11</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
index 4f393c5652..ec869e3eef 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
@@ -30,7 +30,9 @@ import redis.clients.jedis.Jedis;
import redis.embedded.RedisServer;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -64,7 +66,7 @@ public class RedisTableTest {
}
@Test
- public void testSinkWithPlain() throws Exception {
+ public void testSinkWithPlain() {
StreamExecutionEnvironment executionEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv =
@@ -72,8 +74,6 @@ public class RedisTableTest {
executionEnv.setParallelism(1);
- String address = "localhost:" + redisPort;
-
DataStream<Row> source =
executionEnv.fromCollection(
Arrays.asList(
@@ -109,15 +109,16 @@ public class RedisTableTest {
String query = "INSERT INTO sink SELECT * FROM source";
tableEnv.executeSql(query);
- Thread.sleep(4000);
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals("r12,1.2,1", jedis.get("1"));
+ assertEquals("r22,2.2,2", jedis.get("2"));
+ assertEquals("r32,3.2,3", jedis.get("3"));
+ });
- assertEquals("r12,1.2,1", jedis.get("1"));
- assertEquals("r22,2.2,2", jedis.get("2"));
- assertEquals("r32,3.2,3", jedis.get("3"));
}
@Test
- public void testSinkWithHashPrefixMatch() throws Exception {
+ public void testSinkWithHashPrefixMatch() {
StreamExecutionEnvironment executionEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv =
@@ -125,8 +126,6 @@ public class RedisTableTest {
executionEnv.setParallelism(1);
- String address = "localhost:" + redisPort;
-
DataStream<Row> source =
executionEnv.fromCollection(
Arrays.asList(
@@ -162,15 +161,16 @@ public class RedisTableTest {
String query = "INSERT INTO sink SELECT * FROM source";
tableEnv.executeSql(query);
- Thread.sleep(4000);
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals("1.2,1", jedis.hget("1", "r12"));
+ assertEquals("2.2,2", jedis.hget("2", "r22"));
+ assertEquals("3.2,3", jedis.hget("3", "r32"));
+ });
- assertEquals("1.2,1", jedis.hget("1", "r12"));
- assertEquals("2.2,2", jedis.hget("2", "r22"));
- assertEquals("3.2,3", jedis.hget("3", "r32"));
}
@Test
- public void testSinkWithHashKvPair() throws Exception {
+ public void testSinkWithHashKvPair() {
StreamExecutionEnvironment executionEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv =
@@ -178,8 +178,6 @@ public class RedisTableTest {
executionEnv.setParallelism(1);
- String address = "localhost:" + redisPort;
-
DataStream<Row> source =
executionEnv.fromCollection(
Arrays.asList(
@@ -217,18 +215,19 @@ public class RedisTableTest {
String query = "INSERT INTO sink SELECT aaa,bbb,cast(ccc as
STRING),ddd, cast(eee as STRING) FROM source";
tableEnv.executeSql(query);
- Thread.sleep(4000);
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals("1.2", jedis.hget("1", "r12"));
+ assertEquals("2.2", jedis.hget("2", "r22"));
+ assertEquals("3.2", jedis.hget("3", "r32"));
+ assertEquals("1", jedis.hget("1", "r14"));
+ assertEquals("2", jedis.hget("2", "r24"));
+ assertEquals("3", jedis.hget("3", "r34"));
+ });
- assertEquals("1.2", jedis.hget("1", "r12"));
- assertEquals("2.2", jedis.hget("2", "r22"));
- assertEquals("3.2", jedis.hget("3", "r32"));
- assertEquals("1", jedis.hget("1", "r14"));
- assertEquals("2", jedis.hget("2", "r24"));
- assertEquals("3", jedis.hget("3", "r34"));
}
@Test
- public void testSinkWithDynamic() throws Exception {
+ public void testSinkWithDynamic() {
StreamExecutionEnvironment executionEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv =
@@ -236,8 +235,6 @@ public class RedisTableTest {
executionEnv.setParallelism(1);
- String address = "localhost:" + redisPort;
-
DataStream<Row> source =
executionEnv.fromCollection(
Arrays.asList(
@@ -276,14 +273,15 @@ public class RedisTableTest {
+ "FROM source";
tableEnv.executeSql(query);
- Thread.sleep(4000);
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals("1.2", jedis.hget("1", "r12"));
+ assertEquals("2.2", jedis.hget("2", "r22"));
+ assertEquals("3.2", jedis.hget("3", "r32"));
+ assertEquals("1", jedis.hget("1", "r14"));
+ assertEquals("2", jedis.hget("2", "r24"));
+ assertEquals("3", jedis.hget("3", "r34"));
+ });
- assertEquals("1.2", jedis.hget("1", "r12"));
- assertEquals("2.2", jedis.hget("2", "r22"));
- assertEquals("3.2", jedis.hget("3", "r32"));
- assertEquals("1", jedis.hget("1", "r14"));
- assertEquals("2", jedis.hget("2", "r24"));
- assertEquals("3", jedis.hget("3", "r34"));
}
@Test
@@ -295,8 +293,6 @@ public class RedisTableTest {
executionEnv.setParallelism(1);
- String address = "localhost:" + redisPort;
-
DataStream<Row> source =
executionEnv.fromCollection(
Arrays.asList(
@@ -343,14 +339,15 @@ public class RedisTableTest {
Thread.sleep(4000);
- assertTrue(jedis.getbit("1", 2));
- assertTrue(jedis.getbit("1", 4));
-
- assertFalse(jedis.getbit("2", 2));
- assertFalse(jedis.getbit("2", 4));
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertTrue(jedis.getbit("1", 2));
+ assertTrue(jedis.getbit("1", 4));
+ assertFalse(jedis.getbit("2", 2));
+ assertFalse(jedis.getbit("2", 4));
+ assertFalse(jedis.getbit("3", 2));
+ assertFalse(jedis.getbit("3", 4));
+ });
- assertFalse(jedis.getbit("3", 2));
- assertFalse(jedis.getbit("3", 4));
}
}