idatasing-wangwei opened a new issue, #5394: URL: https://github.com/apache/seatunnel/issues/5394
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened Example scenario: When consuming kafka data, when a Partition of the computing engine consumes three partitions in kafka at the same time, two of the partitions have no data and only one Partition has data. Consumption data can be particularly slow 在消费kafka数据时,举例场景:当计算引擎的一个分区同时消费kafka的3个Partition时,其中有两个Partition没有数据,只有一个Partition有数据时。消费的数据时会特别慢 ### SeaTunnel Version 2.3.0 2.3.1 2.3.2 ### SeaTunnel Config ```conf { "env" : { "job.name" : "5d691c7ba53040afb36d32eda1c2ef6b", "execution.parallelism" : 1, "execution.planner" : "blink", "job.mode" : "STREAMING", "execution.checkpoint.interval" : 60000, "execution.checkpoint.mode" : "exactly-once", "execution.checkpoint.data-uri" : "hdfs://ns/user/hzx/seatunnel/jod", "execution.state.backend" : "rocksdb", "execution.checkpoint.cleanup-mode" : false, "execution.restart.strategy" : "failure-rate", "execution.restart.failureRate" : 5, "execution.restart.failureInterval" : 300000, "execution.restart.delayInterval" : 10000 }, "source" : [ { "bootstrap.servers" : "*****:9092", "consumer.group" : "****", "format" : "json", "kafka.acks" : "1", "kafka.max.poll.records" : "100000", "topic" : "t_ybrz", "schema" : { "fields" : { "usernum" : "string", "relatenum" : "string", "time" : "string", "bid" : "string", "url" : "string" } }, "start_mode" : "latest", "plugin_name" : "Kafka", "result_table_name" : "t_ybrz_sr", "parallelism" : 1 } ], "sink" : [ { "metastore_uri" : "thrift://****:9083", "hdfs_site_path" : "hdfs-site.xml", "hive.metastore.sasl.enabled" : true, "plugin_name" : "Hive", "table_name" : "test.t_ybrz", "source_table_name" : "t_ybrz_sr", "parallelism" : 1 } ] } ``` ### Running Command ```shell ${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh --config /home/hzx/seatunnel-conf/5d691c7ba53040afb36d32eda1c2ef6b.json --run-mode run -m yarn-cluster -yqu default -ynm 5d691c7ba53040afb36d32eda1c2ef6b -ys 1 -yjm 4GB -ytm 4GB -sae ``` ### Error Exception ```log no error exception 无报错异常 ``` ### Zeta or Flink or Spark Version flink 1.13 ### Java or Scala Version java 1.8.0_362 ### Screenshots Source code problem analysis: Due to the use of completableFuture.join() in the sourceSplits.forEach loop, it will cause blocking and can not make good use of the thread pool. Example scenario: When there are 3 kafka partitions, the first partition has no data and blocks for 10 seconds, the second partition has no data and blocks for 10 seconds, and the third partition has data and consumes it, which is equivalent to 20+ seconds and consumes data only once. 源代码问题分析:由于在 sourceSplits.forEach循环里使用completableFuture.join(),会导致阻塞,不能很好的利用线程池的。 场景举例:当有3个kafka的分区时,第一个分区没有数据,会阻塞10秒,第二个分区没有数据时,会阻塞10秒,第3个分区有数据并且消费了,相当于20+秒只消费了一次数据。 org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader public void pollNext(Collector<SeaTunnelRow> output) throws Exception { if (!running) { Thread.sleep(THREAD_WAIT_TIME); return; } while (pendingPartitionsQueue.size() != 0) { sourceSplits.add(pendingPartitionsQueue.poll()); } sourceSplits.forEach(sourceSplit -> consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> { KafkaConsumerThread thread = new KafkaConsumerThread(metadata); executorService.submit(thread); return thread; })); sourceSplits.forEach(sourceSplit -> { CompletableFuture<Void> completableFuture = new CompletableFuture<>(); try { consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer -> { try { Set<TopicPartition> partitions = Sets.newHashSet(sourceSplit.getTopicPartition()); StringDeserializer stringDeserializer = new StringDeserializer(); stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()), false); consumer.assign(partitions); if (sourceSplit.getStartOffset() >= 0) { consumer.seek(sourceSplit.getTopicPartition(), sourceSplit.getStartOffset()); } ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); for (TopicPartition partition : partitions) { List<ConsumerRecord<byte[], byte[]>> recordList = records.records(partition); for (ConsumerRecord<byte[], byte[]> record : recordList) { try { deserializationSchema.deserialize(record.value(), output); } catch (Exception e) { if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) { log.warn("Deserialize message failed, skip this message, message: {}", new String(record.value())); continue; } throw e; } if (Boundedness.BOUNDED.equals(context.getBoundedness()) && record.offset() >= sourceSplit.getEndOffset()) { break; } } long lastOffset = -1; if (!recordList.isEmpty()) { lastOffset = recordList.get(recordList.size() - 1).offset(); sourceSplit.setStartOffset(lastOffset + 1); } if (lastOffset >= sourceSplit.getEndOffset()) { sourceSplit.setEndOffset(lastOffset); } } } catch (Exception e) { completableFuture.completeExceptionally(e); } completableFuture.complete(null); }); } catch (InterruptedException e) { throw new KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e); } completableFuture.join(); }); if (Boundedness.BOUNDED.equals(context.getBoundedness())) { // signal to the source that we have reached the end of the data. context.signalNoMoreElement(); } } This is my modified code, but there are still some problems. In some special scenarios, 10 seconds of time is still wasted. In addition, dynamic partition discovery is delayed by one minute 这是我修改后的代码,但仍存在一些问题。在某些特殊场景下,仍会浪费10秒钟的时间。同时动态发现分区会延时一分钟的时间 另外这两行代码,我不知道什么意思 StringDeserializer stringDeserializer = new StringDeserializer(); stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()), false); public void pollNext(Collector<SeaTunnelRow> output) throws Exception { if (!running) { Thread.sleep(THREAD_WAIT_TIME); return; } while (pendingPartitionsQueue.size() != 0) { sourceSplits.add(pendingPartitionsQueue.poll()); } sourceSplits.forEach(sourceSplit -> consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> { KafkaConsumerThread thread = new KafkaConsumerThread(metadata); executorService.submit(thread); return thread; })); ArrayList<CompletableFuture<Void>> completableFutures = new ArrayList<>(sourceSplits.size()); sourceSplits.forEach(sourceSplit -> { CompletableFuture<Void> completableFuture = new CompletableFuture<>(); completableFutures.add(completableFuture); try { consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer -> { boolean flag = true; long now = System.currentTimeMillis(); while (flag) { try { Set<TopicPartition> partitions = Sets.newHashSet(sourceSplit.getTopicPartition()); StringDeserializer stringDeserializer = new StringDeserializer(); stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()), false); consumer.assign(partitions); if (sourceSplit.getStartOffset() >= 0) { consumer.seek(sourceSplit.getTopicPartition(), sourceSplit.getStartOffset()); } ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); for (TopicPartition partition : partitions) { List<ConsumerRecord<byte[], byte[]>> recordList = records.records(partition); for (ConsumerRecord<byte[], byte[]> record : recordList) { try { deserializationSchema.deserialize(record.value(), output); } catch (Exception e) { if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) { log.warn("Deserialize message failed, skip this message, message: {}", new String(record.value())); continue; } throw e; } if (Boundedness.BOUNDED.equals(context.getBoundedness()) && record.offset() >= sourceSplit.getEndOffset()) { break; } } long lastOffset = -1; if (!recordList.isEmpty()) { lastOffset = recordList.get(recordList.size() - 1).offset(); sourceSplit.setStartOffset(lastOffset + 1); } if (lastOffset >= sourceSplit.getEndOffset()) { sourceSplit.setEndOffset(lastOffset); } if ((System.currentTimeMillis() - now) > 60000) { flag = false; } } } catch (Exception e) { completableFuture.completeExceptionally(e); } } completableFuture.complete(null); }); } catch (InterruptedException e) { throw new KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e); } }); CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join(); if (Boundedness.BOUNDED.equals(context.getBoundedness())) { // signal to the source that we have reached the end of the data. context.signalNoMoreElement(); } } ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
