zhangshenghang commented on code in PR #10013:
URL: https://github.com/apache/seatunnel/pull/10013#discussion_r2540887196
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -1410,26 +1410,85 @@ public void testKafkaProtobufToAssert(TestContainer
container)
@TestTemplate
@DisabledOnContainer(
- type = EngineType.SPARK,
+ type = {EngineType.SPARK, EngineType.FLINK},
value = {})
- public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container)
- throws InterruptedException {
+ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer
container)
+ throws InterruptedException, IOException {
+
String producerTopic = "kafka_topic_exactly_once_1";
String consumerTopic = "kafka_topic_exactly_once_2";
String sourceData = "Seatunnel Exactly Once Example";
+ final String jobId = "18696753645413";
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
producer.send(record);
producer.flush();
}
- Long endOffset = 0l;
- try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
- consumer.subscribe(Arrays.asList(producerTopic));
- Map<TopicPartition, Long> offsets =
- consumer.endOffsets(Arrays.asList(new
TopicPartition(producerTopic, 0)));
- endOffset = offsets.entrySet().iterator().next().getValue();
+ // async execute
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/kafka/kafka_to_kafka_exactly_once_streaming.conf", jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ // wait for data written to kafka
+ given().pollDelay(60, SECONDS)
+ .pollInterval(5, SECONDS)
+ .await()
+ .atMost(5, MINUTES)
+ .untilAsserted(
+ () -> Assertions.assertTrue(checkData(consumerTopic,
10, sourceData)));
+
+ // Savepoint the running job (so restore should continue from this
position).
+ container.savepointJob(jobId);
+
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
+ producer.send(record);
+ producer.flush();
+ }
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/kafka/kafka_to_kafka_exactly_once_streaming.conf", jobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ given().pollDelay(60, SECONDS)
+ .pollInterval(5, SECONDS)
+ .await()
+ .atMost(10, MINUTES)
+ .untilAsserted(
+ () -> Assertions.assertTrue(checkData(consumerTopic,
10, sourceData)));
Review Comment:
Data will be verified in checkData. Please change the value of the data sent
after pausing and resuming before sending it. Currently, both before and after
pausing, the data sent is sourceData.getBytes()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]