This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3b649727ef [Improve][E2E] improve kafka e2e (#8295)
3b649727ef is described below
commit 3b649727efa5f6ff305f4463fa78ca21c57313b8
Author: zhangdonghao <[email protected]>
AuthorDate: Sun Dec 15 10:30:35 2024 +0800
[Improve][E2E] improve kafka e2e (#8295)
---
.../org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index f9483fd65f..b199a2848a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -75,7 +75,6 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -104,7 +103,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import static org.awaitility.Awaitility.await;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
@Slf4j
public class KafkaIT extends TestSuiteBase implements TestResource {
@@ -132,8 +131,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started");
- Awaitility.given()
- .ignoreExceptions()
+ given().ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
@@ -789,11 +787,12 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
}
return null;
});
- TimeUnit.MINUTES.sleep(5);
// wait for data written to kafka
Long finalEndOffset = endOffset;
- await().atMost(5, TimeUnit.MINUTES)
- .pollInterval(5000, TimeUnit.MILLISECONDS)
+ given().pollDelay(30, TimeUnit.SECONDS)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .await()
+ .atMost(5, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertTrue(