This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d564e88926453cfa05863255031999da155b833b Author: JunRuiLee <[email protected]> AuthorDate: Wed Sep 4 13:37:15 2024 +0800 [FLINK-36185][e2e] Rewrite end to end cases with new Kafka source. Co-authored-by: Qingsheng Ren <[email protected]> --- .../registry/test/TestAvroConsumerConfluent.java | 33 +++++++++++++--------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java index e3f41497899..abf5f7104ad 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java @@ -17,21 +17,23 @@ package org.apache.flink.schema.registry.test; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import example.avro.User; import org.apache.avro.specific.SpecificRecordBase; -import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; @@ -64,22 +66,28 @@ public class TestAvroConsumerConfluent { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStreamSource<User> input = - env.addSource( - new FlinkKafkaConsumer<>( - parameterTool.getRequired("input-topic"), + String bootstrapServers = parameterTool.getRequired("bootstrap.servers"); + KafkaSource<User> kafkaSource = + KafkaSource.<User>builder() + .setBootstrapServers(bootstrapServers) + .setGroupId(parameterTool.getRequired("group.id")) + .setTopics(parameterTool.getRequired("input-topic")) + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly( ConfluentRegistryAvroDeserializationSchema.forSpecific( - User.class, schemaRegistryUrl), - config) - .setStartFromEarliest()); + User.class, schemaRegistryUrl))) + .setStartingOffsets(OffsetsInitializer.earliest()) + .build(); + + DataStreamSource<User> input = + env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); SingleOutputStreamOperator<String> mapToString = input.map((MapFunction<User, String>) SpecificRecordBase::toString); KafkaSink<String> stringSink = KafkaSink.<String>builder() - .setBootstrapServers( - config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setBootstrapServers(bootstrapServers) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setValueSerializationSchema(new SimpleStringSchema()) @@ -91,8 +99,7 @@ public class TestAvroConsumerConfluent { KafkaSink<User> avroSink = KafkaSink.<User>builder() - .setBootstrapServers( - config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setBootstrapServers(bootstrapServers) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setValueSerializationSchema(
