This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d564e88926453cfa05863255031999da155b833b
Author: JunRuiLee <[email protected]>
AuthorDate: Wed Sep 4 13:37:15 2024 +0800

    [FLINK-36185][e2e] Rewrite end to end cases with new Kafka source.
    
    Co-authored-by: Qingsheng Ren <[email protected]>
---
 .../registry/test/TestAvroConsumerConfluent.java   | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
index e3f41497899..abf5f7104ad 100644
--- 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
+++ 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -17,21 +17,23 @@
 
 package org.apache.flink.schema.registry.test;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
 import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 
 import example.avro.User;
 import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.kafka.clients.producer.ProducerConfig;
 
 import java.util.Properties;
 
@@ -64,22 +66,28 @@ public class TestAvroConsumerConfluent {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStreamSource<User> input =
-                env.addSource(
-                        new FlinkKafkaConsumer<>(
-                                        
parameterTool.getRequired("input-topic"),
+        String bootstrapServers = 
parameterTool.getRequired("bootstrap.servers");
+        KafkaSource<User> kafkaSource =
+                KafkaSource.<User>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setGroupId(parameterTool.getRequired("group.id"))
+                        .setTopics(parameterTool.getRequired("input-topic"))
+                        .setDeserializer(
+                                KafkaRecordDeserializationSchema.valueOnly(
                                         
ConfluentRegistryAvroDeserializationSchema.forSpecific(
-                                                User.class, schemaRegistryUrl),
-                                        config)
-                                .setStartFromEarliest());
+                                                User.class, 
schemaRegistryUrl)))
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .build();
+
+        DataStreamSource<User> input =
+                env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"Kafka Source");
 
         SingleOutputStreamOperator<String> mapToString =
                 input.map((MapFunction<User, String>) 
SpecificRecordBase::toString);
 
         KafkaSink<String> stringSink =
                 KafkaSink.<String>builder()
-                        .setBootstrapServers(
-                                
config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                        .setBootstrapServers(bootstrapServers)
                         .setRecordSerializer(
                                 KafkaRecordSerializationSchema.builder()
                                         .setValueSerializationSchema(new 
SimpleStringSchema())
@@ -91,8 +99,7 @@ public class TestAvroConsumerConfluent {
 
         KafkaSink<User> avroSink =
                 KafkaSink.<User>builder()
-                        .setBootstrapServers(
-                                
config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                        .setBootstrapServers(bootstrapServers)
                         .setRecordSerializer(
                                 KafkaRecordSerializationSchema.builder()
                                         .setValueSerializationSchema(

Reply via email to