idatasing-wangwei opened a new issue, #5394:
URL: https://github.com/apache/seatunnel/issues/5394

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   Example scenario: When consuming kafka data, when a Partition of the 
computing engine consumes three partitions in kafka at the same time, two of 
the partitions have no data and only one Partition has data. Consumption data 
can be particularly slow 
   
在消费kafka数据时,举例场景:当计算引擎的一个分区同时消费kafka的3个Partition时,其中有两个Partition没有数据,只有一个Partition有数据时。消费的数据时会特别慢
   
   ### SeaTunnel Version
   
   2.3.0
   2.3.1
   2.3.2
   
   
   ### SeaTunnel Config
   
   ```conf
   {
     "env" : {
       "job.name" : "5d691c7ba53040afb36d32eda1c2ef6b",
       "execution.parallelism" : 1,
       "execution.planner" : "blink",
       "job.mode" : "STREAMING",
       "execution.checkpoint.interval" : 60000,
       "execution.checkpoint.mode" : "exactly-once",
       "execution.checkpoint.data-uri" : "hdfs://ns/user/hzx/seatunnel/jod",
       "execution.state.backend" : "rocksdb",
       "execution.checkpoint.cleanup-mode" : false,
       "execution.restart.strategy" : "failure-rate",
       "execution.restart.failureRate" : 5,
       "execution.restart.failureInterval" : 300000,
       "execution.restart.delayInterval" : 10000
     },
     "source" : [ {
       "bootstrap.servers" : "*****:9092",
       "consumer.group" : "****",
       "format" : "json",
       "kafka.acks" : "1",
       "kafka.max.poll.records" : "100000",
       "topic" : "t_ybrz",
       "schema" : {
         "fields" : {
           "usernum" : "string",
           "relatenum" : "string",
           "time" : "string",
           "bid" : "string",
           "url" : "string"
         }
       },
       "start_mode" : "latest",
       "plugin_name" : "Kafka",
       "result_table_name" : "t_ybrz_sr",
       "parallelism" : 1
     } ],
     "sink" : [ {
       "metastore_uri" : "thrift://****:9083",
       "hdfs_site_path" : "hdfs-site.xml",
       "hive.metastore.sasl.enabled" : true,
       "plugin_name" : "Hive",
       "table_name" : "test.t_ybrz",
       "source_table_name" : "t_ybrz_sr",
       "parallelism" : 1
     } ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh --config 
/home/hzx/seatunnel-conf/5d691c7ba53040afb36d32eda1c2ef6b.json --run-mode run 
-m yarn-cluster -yqu default -ynm  5d691c7ba53040afb36d32eda1c2ef6b -ys 1  -yjm 
4GB -ytm 4GB -sae
   ```
   
   
   ### Error Exception
   
   ```log
   no  error exception
   无报错异常
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   flink 1.13
   
   ### Java or Scala Version
   
   java 1.8.0_362
   
   ### Screenshots
   
   Source code problem analysis: Due to the use of completableFuture.join() in 
the sourceSplits.forEach loop, it will cause blocking and can not make good use 
of the thread pool.
   
   Example scenario: When there are 3 kafka partitions, the first partition has 
no data and blocks for 10 seconds, the second partition has no data and blocks 
for 10 seconds, and the third partition has data and consumes it, which is 
equivalent to 20+ seconds and consumes data only once.
   源代码问题分析:由于在  
sourceSplits.forEach循环里使用completableFuture.join(),会导致阻塞,不能很好的利用线程池的。
   
场景举例:当有3个kafka的分区时,第一个分区没有数据,会阻塞10秒,第二个分区没有数据时,会阻塞10秒,第3个分区有数据并且消费了,相当于20+秒只消费了一次数据。
   org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
           if (!running) {
               Thread.sleep(THREAD_WAIT_TIME);
               return;
           }
   
           while (pendingPartitionsQueue.size() != 0) {
               sourceSplits.add(pendingPartitionsQueue.poll());
           }
           sourceSplits.forEach(sourceSplit -> 
consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
               KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
               executorService.submit(thread);
               return thread;
           }));
           sourceSplits.forEach(sourceSplit -> {
               CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
               try {
                   
consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer 
-> {
                       try {
                           Set<TopicPartition> partitions = 
Sets.newHashSet(sourceSplit.getTopicPartition());
                           StringDeserializer stringDeserializer = new 
StringDeserializer();
                           
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
 false);
                           consumer.assign(partitions);
                           if (sourceSplit.getStartOffset() >= 0) {
                               consumer.seek(sourceSplit.getTopicPartition(), 
sourceSplit.getStartOffset());
                           }
                           ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
                           for (TopicPartition partition : partitions) {
                               List<ConsumerRecord<byte[], byte[]>> recordList 
= records.records(partition);
                               for (ConsumerRecord<byte[], byte[]> record : 
recordList) {
                                   try {
                                       
deserializationSchema.deserialize(record.value(), output);
                                   } catch (Exception e) {
                                       if (this.messageFormatErrorHandleWay == 
MessageFormatErrorHandleWay.SKIP) {
                                           log.warn("Deserialize message 
failed, skip this message, message: {}", new String(record.value()));
                                           continue;
                                       }
                                       throw e;
                                   }
   
                                   if 
(Boundedness.BOUNDED.equals(context.getBoundedness()) && record.offset() >= 
sourceSplit.getEndOffset()) {
                                       break;
                                   }
                               }
                               long lastOffset = -1;
                               if (!recordList.isEmpty()) {
                                   lastOffset = 
recordList.get(recordList.size() - 1).offset();
                                   sourceSplit.setStartOffset(lastOffset + 1);
                               }
                               if (lastOffset >= sourceSplit.getEndOffset()) {
                                   sourceSplit.setEndOffset(lastOffset);
                               }
                           }
                       } catch (Exception e) {
                           completableFuture.completeExceptionally(e);
                       }
                       completableFuture.complete(null);
                   });
               } catch (InterruptedException e) {
                   throw new 
KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
               }
               completableFuture.join();
           });
   
           if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
               // signal to the source that we have reached the end of the data.
               context.signalNoMoreElement();
           }
       }
   This is my modified code, but there are still some problems. In some special 
scenarios, 10 seconds of time is still wasted. In addition, dynamic partition 
discovery is delayed by one minute
   这是我修改后的代码,但仍存在一些问题。在某些特殊场景下,仍会浪费10秒钟的时间。同时动态发现分区会延时一分钟的时间
   另外这两行代码,我不知道什么意思
   StringDeserializer stringDeserializer = new StringDeserializer();
   
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
 false);
   
   public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
           if (!running) {
               Thread.sleep(THREAD_WAIT_TIME);
               return;
           }
   
           while (pendingPartitionsQueue.size() != 0) {
               sourceSplits.add(pendingPartitionsQueue.poll());
           }
           sourceSplits.forEach(sourceSplit -> 
consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
               KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
               executorService.submit(thread);
               return thread;
           }));
           ArrayList<CompletableFuture<Void>> completableFutures = new 
ArrayList<>(sourceSplits.size());
           sourceSplits.forEach(sourceSplit -> {
               CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
               completableFutures.add(completableFuture);
               try {
                   
consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer 
-> {
                       boolean flag = true;
                       long now = System.currentTimeMillis();
                       while (flag) {
                           try {
                               Set<TopicPartition> partitions = 
Sets.newHashSet(sourceSplit.getTopicPartition());
                               StringDeserializer stringDeserializer = new 
StringDeserializer();
                               
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
 false);
                               consumer.assign(partitions);
                               if (sourceSplit.getStartOffset() >= 0) {
                                   
consumer.seek(sourceSplit.getTopicPartition(), sourceSplit.getStartOffset());
                               }
                               ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
                               for (TopicPartition partition : partitions) {
                                   List<ConsumerRecord<byte[], byte[]>> 
recordList = records.records(partition);
                                   for (ConsumerRecord<byte[], byte[]> record : 
recordList) {
                                       try {
                                           
deserializationSchema.deserialize(record.value(), output);
                                       } catch (Exception e) {
                                           if (this.messageFormatErrorHandleWay 
== MessageFormatErrorHandleWay.SKIP) {
                                               log.warn("Deserialize message 
failed, skip this message, message: {}", new String(record.value()));
                                               continue;
                                           }
                                           throw e;
                                       }
   
                                       if 
(Boundedness.BOUNDED.equals(context.getBoundedness()) && record.offset() >= 
sourceSplit.getEndOffset()) {
                                           break;
                                       }
                                   }
                                   long lastOffset = -1;
                                   if (!recordList.isEmpty()) {
                                       lastOffset = 
recordList.get(recordList.size() - 1).offset();
                                       sourceSplit.setStartOffset(lastOffset + 
1);
                                   }
                                   if (lastOffset >= 
sourceSplit.getEndOffset()) {
                                       sourceSplit.setEndOffset(lastOffset);
                                   }
                                   if ((System.currentTimeMillis() - now) > 
60000) {
                                       flag = false;
                                   }
                               }
                           } catch (Exception e) {
                               completableFuture.completeExceptionally(e);
                           }
                       }
                       completableFuture.complete(null);
                   });
               } catch (InterruptedException e) {
                   throw new 
KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
               }
           });
           CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[0])).join();
           if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
               // signal to the source that we have reached the end of the data.
               context.signalNoMoreElement();
           }
       }
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to