This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06d4828423e2d4e29fe5ddf5710ca651805e5d7a Author: Fabian Paul <[email protected]> AuthorDate: Wed Sep 15 12:46:41 2021 +0200 [FLINK-24292][connectors/kafka] Use KafkaSink in examples instead of FlinkKafkaProducer --- .../flink/streaming/kafka/test/KafkaExample.java | 24 ++++++++++++++-------- .../statemachine/KafkaEventsGeneratorJob.java | 15 ++++++++++++-- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java index 36988b7..01e4819 100644 --- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java +++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java @@ -18,17 +18,19 @@ package org.apache.flink.streaming.kafka.test; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor; import org.apache.flink.streaming.kafka.test.base.KafkaEvent; import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema; import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil; import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper; +import org.apache.kafka.clients.producer.ProducerConfig; + /** * A simple example that shows how to read from and write to modern Kafka. This will read String * messages from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, @@ -60,12 +62,18 @@ public class KafkaExample extends KafkaExampleUtil { .keyBy("word") .map(new RollingAdditionMapper()); - input.addSink( - new FlinkKafkaProducer<>( - parameterTool.getRequired("output-topic"), - new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()), - parameterTool.getProperties(), - FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); + input.sinkTo( + KafkaSink.<KafkaEvent>builder() + .setBootstrapServers( + parameterTool + .getProperties() + .getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(parameterTool.getRequired("output-topic")) + .setValueSerializationSchema(new KafkaEventSchema()) + .build()) + .build()); env.execute("Modern Kafka Example"); } diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java index 0d3d476..822d17c 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java @@ -19,8 +19,10 @@ package org.apache.flink.streaming.examples.statemachine; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.examples.statemachine.event.Event; import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource; import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer; @@ -47,7 +49,16 @@ public class KafkaEventsGeneratorJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new EventsGeneratorSource(errorRate, sleep)) - .addSink(new FlinkKafkaProducer<>(brokers, kafkaTopic, new EventDeSerializer())); + .sinkTo( + KafkaSink.<Event>builder() + .setBootstrapServers(brokers) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setValueSerializationSchema( + new EventDeSerializer()) + .setTopic(kafkaTopic) + .build()) + .build()); // trigger program execution env.execute("State machine example Kafka events generator job");
