[
https://issues.apache.org/jira/browse/FLINK-28576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567828#comment-17567828
]
Echo Lee commented on FLINK-28576:
----------------------------------
[~martijnvisser] Thanks for your reply, I just print the time spent every time
I pull 100,000 data, the old api takes an average of 60 seconds, the new one
takes only about 4 seconds.
I think my use case is simple, don't know where is the problem.
> Kafka's two source api performance differences
> ----------------------------------------------
>
> Key: FLINK-28576
> URL: https://issues.apache.org/jira/browse/FLINK-28576
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Echo Lee
> Priority: Major
>
> I recently found out that the new kafka source api is 10 times more
> performant than the old one, but don't know what's causing it.
>
> {code:java}
> // new source api
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers(brokers)
> .setTopics("benchmark")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setBounded(OffsetsInitializer.latest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
> .map(new MapFunction<String, Object>() {
> private int count = 0;
> private long lastTime = System.currentTimeMillis();
>
> @Override
> public Object map(String value) throws Exception {
> count++;
> if (count % 100000 == 0) {
> long currentTime = System.currentTimeMillis();
> System.out.println(currentTime - lastTime);
> lastTime = currentTime;
> }
> return null;
> }
> });{code}
>
>
>
> {code:java}
> // old source api
> FlinkKafkaConsumer<String> flinkKafkaConsumer = new
> FlinkKafkaConsumer<String>("benchmark",
> new SimpleStringSchema(), properties);
> flinkKafkaConsumer.setStartFromEarliest();
> env.addSource(flinkKafkaConsumer)
> .map(new MapFunction<String, Object>() {
> private int count = 0;
> private long lastTime = System.currentTimeMillis();
>
> @Override
> public Object map(String value) throws Exception {
> count++;
> if (count % 100000 == 0) {
> long currentTime = System.currentTimeMillis();
> System.out.println(currentTime - lastTime);
> lastTime = currentTime;
> }
> return null;
> }
> });{code}
>
> Two ways to use the same data of the same topic.
> Flink version: 1.14.x
> Single data size: 1k
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)