WenDing-Y commented on code in PR #9359:
URL: https://github.com/apache/seatunnel/pull/9359#discussion_r2107338367
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -143,6 +145,26 @@ public void startUp() throws Exception {
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initKafkaProducer);
+ Properties adminProps = new Properties();
+ adminProps.put(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ // Set the retention time to -1 to read data older than 7 days.
+ try (AdminClient adminClient = AdminClient.create(adminProps)) {
+ NewTopic testTopicSource = new NewTopic("test_topic_source", 1,
(short) 1);
+ testTopicSource.configs(Collections.singletonMap("retention.ms",
"-1"));
+
+ NewTopic testTopicNativeSource = new
NewTopic("test_topic_native_source", 1, (short) 1);
+
testTopicNativeSource.configs(Collections.singletonMap("retention.ms", "-1"));
+
+ NewTopic testTopicSourceWithTimestamp =
+ new NewTopic("test_topic_source_timestamp", 1, (short) 1);
+
testTopicSourceWithTimestamp.configs(Collections.singletonMap("retention.ms",
"-1"));
+
+
adminClient.createTopics(Collections.singletonList(testTopicSource));
+
adminClient.createTopics(Collections.singletonList(testTopicNativeSource));
+
adminClient.createTopics(Collections.singletonList(testTopicSourceWithTimestamp));
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]