This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3608ef838d [Improve][E2E] Improve Druid E2E Case (#8077)
3608ef838d is described below
commit 3608ef838d74d7cc208cc6f31af090944ae03db9
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Nov 18 18:27:02 2024 +0800
[Improve][E2E] Improve Druid E2E Case (#8077)
---
.../seatunnel/e2e/connector/druid/DruidIT.java | 95 ++++++++++++----------
1 file changed, 51 insertions(+), 44 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
index 21463e4917..ab8945e3e1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
@@ -51,8 +51,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.awaitility.Awaitility.given;
+
@Slf4j
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
@@ -94,27 +97,28 @@ public class DruidIT extends TestSuiteBase implements
TestResource {
public void testDruidSink(TestContainer container) throws Exception {
Container.ExecResult execResult =
container.executeJob("/fakesource_to_druid.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- while (true) {
- String responseBody = getSelectResponse(DATASOURCE);
- String expectedDataRow1 =
-
"\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
- String expectedDataRow2 =
-
"\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
- String expectedDataRow3 =
-
"\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
- String expectedDataRow4 =
-
"\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
-
- if (!responseBody.contains("errorMessage")) {
- // Check sink data
-
Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
-
Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
-
Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
-
Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
- break;
- }
- Thread.sleep(1000);
- }
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(400L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ String responseBody =
getSelectResponse(DATASOURCE);
+ String expectedDataRow1 =
+
"\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
+ String expectedDataRow2 =
+
"\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
+ String expectedDataRow3 =
+
"\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
+ String expectedDataRow4 =
+
"\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
+
Assertions.assertFalse(responseBody.contains("errorMessage"));
+
Assertions.assertTrue(responseBody.contains(expectedDataRow1));
+
Assertions.assertTrue(responseBody.contains(expectedDataRow2));
+
Assertions.assertTrue(responseBody.contains(expectedDataRow3));
+
Assertions.assertTrue(responseBody.contains(expectedDataRow4));
+ });
}
@DisabledOnContainer(
@@ -127,28 +131,32 @@ public class DruidIT extends TestSuiteBase implements
TestResource {
container.executeJob("/fakesource_to_druid_with_multi.conf");
Assertions.assertEquals(0, execResult.getExitCode());
// Check multi sink table 1
- while (true) {
- String responseBody = getSelectResponse(MULTI_DATASOURCE_1);
- String expectedDataRow =
-
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\"";
-
- if (!responseBody.contains("errorMessage")) {
-
Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
- break;
- }
- Thread.sleep(1000);
- }
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(400L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ String responseBody =
getSelectResponse(MULTI_DATASOURCE_1);
+ String expectedDataRow =
+
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\"";
+
Assertions.assertFalse(responseBody.contains("errorMessage"));
+
Assertions.assertTrue(responseBody.contains(expectedDataRow));
+ });
+
// Check multi sink table 2
- while (true) {
- String responseBody = getSelectResponse(MULTI_DATASOURCE_2);
- String expectedDataRow =
-
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3";
- if (!responseBody.contains("errorMessage")) {
-
Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
- break;
- }
- Thread.sleep(1000);
- }
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(400L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ String responseBody =
getSelectResponse(MULTI_DATASOURCE_2);
+ String expectedDataRow =
+
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3";
+
Assertions.assertFalse(responseBody.contains("errorMessage"));
+
Assertions.assertTrue(responseBody.contains(expectedDataRow));
+ });
}
private void changeCoordinatorURLConf(String resourceFilePath) throws
UnknownHostException {
@@ -184,8 +192,7 @@ public class DruidIT extends TestSuiteBase implements
TestResource {
entity.setContentType("application/json");
request.setEntity(entity);
HttpResponse response = client.execute(request);
- String responseBody = EntityUtils.toString(response.getEntity());
- return responseBody;
+ return EntityUtils.toString(response.getEntity());
}
}
}