This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 09d12fc40e [Feature][kafka] Add arg  poll.timeout  for interval poll 
messages (#7606)
09d12fc40e is described below

commit 09d12fc40e478f935b31f0a591759a7cdc006533
Author: CosmosNi <[email protected]>
AuthorDate: Tue Sep 10 09:56:12 2024 +0800

    [Feature][kafka] Add arg  poll.timeout  for interval poll messages (#7606)
---
 docs/en/connector-v2/source/kafka.md               | 41 +++++++++++-----------
 docs/zh/connector-v2/source/Kafka.md               |  3 +-
 .../connectors/seatunnel/kafka/config/Config.java  |  6 ++++
 .../kafka/source/KafkaPartitionSplitReader.java    |  6 ++--
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |  3 ++
 5 files changed, 36 insertions(+), 23 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index e9259fae48..90c183c2c1 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -32,26 +32,27 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Source Options
 
-| Name                                | Type                                   
                                     | Required | Default                  | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                 [...]
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| topic                               | String                                 
                                     | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                      [...]
-| table_list                          | Map                                    
                                     | No       | -                        | 
Topic list config You can configure only one `table_list` and one `topic` at 
the same time                                                                   
                                                                                
                                                                                
                    [...]
-| bootstrap.servers                   | String                                 
                                     | Yes      | -                        | 
Comma separated list of Kafka brokers.                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| pattern                             | Boolean                                
                                     | No       | false                    | If 
`pattern` is set to `true`,the regular expression for a pattern of topic names 
to read from. All topics in clients with names that match the specified regular 
expression will be subscribed by the consumer.                                  
                                                                                
               [...]
-| consumer.group                      | String                                 
                                     | No       | SeaTunnel-Consumer-Group | 
`Kafka consumer group id`, used to distinguish different consumer groups.       
                                                                                
                                                                                
                                                                                
                 [...]
-| commit_on_checkpoint                | Boolean                                
                                     | No       | true                     | If 
true the consumer's offset will be periodically committed in the background.    
                                                                                
                                                                                
                                                                                
              [...]
-| kafka.config                        | Map                                    
                                     | No       | -                        | In 
addition to the above necessary parameters that must be specified by the `Kafka 
consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).         
              [...]
-| schema                              | Config                                 
                                     | No       | -                        | 
The structure of the data, including field names and field types.               
                                                                                
                                                                                
                                                                                
                 [...]
-| format                              | String                                 
                                     | No       | json                     | 
Data format. The default format is json. Optional text format, canal_json, 
debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or 
text format. The default field separator is ", ". If you customize the 
delimiter, add the "field_delimiter" option.If you use canal format, please 
refer to [canal-json](../formats/cana [...]
-| format_error_handle_way             | String                                 
                                     | No       | fail                     | 
The processing method of data format error. The default value is fail, and the 
optional value is (fail, skip). When fail is selected, data format error will 
block and an exception will be thrown. When skip is selected, data format error 
will skip this line data.                                                       
                    [...]
-| field_delimiter                     | String                                 
                                     | No       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
                                                                                
                                                                                
                 [...]
+| Name                                | Type                                   
                                   | Required | Default                  | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                   [...]
+|-------------------------------------|---------------------------------------------------------------------------|----------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| topic                               | String                                 
                                   | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                        [...]
+| table_list                          | Map                                    
                                   | No       | -                        | 
Topic list config You can configure only one `table_list` and one `topic` at 
the same time                                                                   
                                                                                
                                                                                
                      [...]
+| bootstrap.servers                   | String                                 
                                   | Yes      | -                        | 
Comma separated list of Kafka brokers.                                          
                                                                                
                                                                                
                                                                                
                   [...]
+| pattern                             | Boolean                                
                                   | No       | false                    | If 
`pattern` is set to `true`,the regular expression for a pattern of topic names 
to read from. All topics in clients with names that match the specified regular 
expression will be subscribed by the consumer.                                  
                                                                                
                 [...]
+| consumer.group                      | String                                 
                                   | No       | SeaTunnel-Consumer-Group | 
`Kafka consumer group id`, used to distinguish different consumer groups.       
                                                                                
                                                                                
                                                                                
                   [...]
+| commit_on_checkpoint                | Boolean                                
                                   | No       | true                     | If 
true the consumer's offset will be periodically committed in the background.    
                                                                                
                                                                                
                                                                                
                [...]
+| poll.timeout                        | Long                                   
                                   | No       | 10000               | The 
interval(millis) for poll messages.                                             
                                                                                
                                                                                
                                                                                
                    [...]
+| kafka.config                        | Map                                    
                                   | No       | -                        | In 
addition to the above necessary parameters that must be specified by the `Kafka 
consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).         
                [...]
+| schema                              | Config                                 
                                   | No       | -                        | The 
structure of the data, including field names and field types.                   
                                                                                
                                                                                
                                                                                
               [...]
+| format                              | String                                 
                                   | No       | json                     | Data 
format. The default format is json. Optional text format, canal_json, 
debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or 
text format. The default field separator is ", ". If you customize the 
delimiter, add the "field_delimiter" option.If you use canal format, please 
refer to [canal-json](../formats/canal- [...]
+| format_error_handle_way             | String                                 
                                   | No       | fail                     | The 
processing method of data format error. The default value is fail, and the 
optional value is (fail, skip). When fail is selected, data format error will 
block and an exception will be thrown. When skip is selected, data format error 
will skip this line data.                                                       
                      [...]
+| field_delimiter                     | String                                 
                                   | No       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
                                                                                
                                                                                
                   [...]
 | start_mode                          | 
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 
No       | group_offsets            | The initial consumption pattern of 
consumers.                                                                      
                                                                                
                                                                                
                                                              [...]
-| start_mode.offsets                  | Config                                 
                                     | No       | -                        | 
The offset required for consumption mode to be specific_offsets.                
                                                                                
                                                                                
                                                                                
                 [...]
-| start_mode.timestamp                | Long                                   
                                     | No       | -                        | 
The time required for consumption mode to be "timestamp".                       
                                                                                
                                                                                
                                                                                
                 [...]
-| partition-discovery.interval-millis | Long                                   
                                     | No       | -1                       | 
The interval for dynamically discovering topics and partitions.                 
                                                                                
                                                                                
                                                                                
                 [...]
-| common-options                      |                                        
                                     | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                 [...]
-| protobuf_message_name               | String                                 
                                     | No       | -                        | 
Effective when the format is set to protobuf, specifies the Message name        
                                                                                
                                                                                
                                                                                
                 [...]
-| protobuf_schema                     | String                                 
                                     | No       | -                        | 
Effective when the format is set to protobuf, specifies the Schema definition   
                                                                                
                                                                                
                                                                                
                 [...]
+| start_mode.offsets                  | Config                                 
                                   | No       | -                        | The 
offset required for consumption mode to be specific_offsets.                    
                                                                                
                                                                                
                                                                                
               [...]
+| start_mode.timestamp                | Long                                   
                                   | No       | -                        | The 
time required for consumption mode to be "timestamp".                           
                                                                                
                                                                                
                                                                                
               [...]
+| partition-discovery.interval-millis | Long                                   
                                   | No       | -1                       | The 
interval for dynamically discovering topics and partitions.                     
                                                                                
                                                                                
                                                                                
               [...]
+| common-options                      |                                        
                                   | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                   [...]
+| protobuf_message_name               | String                                 
                                   | No       | -                        | 
Effective when the format is set to protobuf, specifies the Message name        
                                                                                
                                                                                
                                                                                
                   [...]
+| protobuf_schema                     | String                                 
                                   | No       | -                        | 
Effective when the format is set to protobuf, specifies the Schema definition   
                                                                                
                                                                                
                                                                                
                   [...]
 
 ## Task Example
 
@@ -291,4 +292,4 @@ source {
     result_table_name = "kafka_table"
   }
 }
-```
\ No newline at end of file
+```
diff --git a/docs/zh/connector-v2/source/Kafka.md 
b/docs/zh/connector-v2/source/Kafka.md
index 8f65e92e92..44e2721556 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -40,6 +40,7 @@
 | pattern                             | Boolean                             | 
否    | false                    | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。  
                                                                                
                                                                                
                                                                                
                          |
 | consumer.group                      | String                              | 
否    | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。                  
                                                                                
                                                                                
                                                                                
                          |
 | commit_on_checkpoint                | Boolean                             | 
否    | true                     | 如果为 true,消费者的偏移量将会定期在后台提交。                    
                                                                                
                                                                                
                                                                                
                          |
+| poll.timeout                        | Long                                 | 
否    | 10000                    | kafka主动拉取时间间隔(毫秒)。                            
                                                                                
                                                                                
                                                                                
                              |
 | kafka.config                        | Map                                 | 
否    | -                        | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 
                                                                                
                                                                                
                              |
 | schema                              | Config                              | 
否    | -                        | 数据结构,包括字段名称和字段类型。                             
                                                                                
                                                                                
                                                                                
                          |
 | format                              | String                              | 
否    | json                     | 数据格式。默认格式为 json。可选格式包括 text, canal_json, 
debezium_json, ogg_json, maxwell_json, avro 和 protobuf。默认字段分隔符为 ", 
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 
[formats](../formats) |
@@ -285,4 +286,4 @@ source {
     result_table_name = "kafka_table"
   }
 }
-```
\ No newline at end of file
+```
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index a907c9bc21..293821e0ed 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -162,6 +162,12 @@ public class Config {
                     .withDescription(
                             "The interval for dynamically discovering topics 
and partitions.");
 
+    public static final Option<Long> KEY_POLL_TIMEOUT =
+            Options.key("poll.timeout")
+                    .longType()
+                    .defaultValue(10000L)
+                    .withDescription("The interval for poll message");
+
     public static final Option<MessageFormatErrorHandleWay> 
MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION =
             Options.key("format_error_handle_way")
                     .enumType(MessageFormatErrorHandleWay.class)
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
index 8bca82999c..d7f0dd0d8d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
@@ -62,7 +62,6 @@ public class KafkaPartitionSplitReader
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
 
-    private static final long POLL_TIMEOUT = 10000L;
     private static final String CLIENT_ID_PREFIX = "seatunnel";
     private final KafkaSourceConfig kafkaSourceConfig;
 
@@ -74,6 +73,8 @@ public class KafkaPartitionSplitReader
 
     private final Set<String> emptySplits = new HashSet<>();
 
+    private final long pollTimeout;
+
     public KafkaPartitionSplitReader(
             KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) 
{
         this.kafkaSourceConfig = kafkaSourceConfig;
@@ -81,13 +82,14 @@ public class KafkaPartitionSplitReader
         this.stoppingOffsets = new HashMap<>();
         this.groupId =
                 
kafkaSourceConfig.getProperties().getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+        this.pollTimeout = kafkaSourceConfig.getPollTimeout();
     }
 
     @Override
     public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws 
IOException {
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
-            consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+            consumerRecords = consumer.poll(Duration.ofMillis(pollTimeout));
         } catch (WakeupException | IllegalStateException e) {
             // IllegalStateException will be thrown if the consumer is not 
assigned any partitions.
             // This happens if all assigned partitions are invalid or empty 
(starting offset >=
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 960a018402..0f645d7218 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -70,6 +70,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIEL
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_POLL_TIMEOUT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_MESSAGE_NAME;
@@ -90,6 +91,7 @@ public class KafkaSourceConfig implements Serializable {
     @Getter private final long discoveryIntervalMillis;
     @Getter private final MessageFormatErrorHandleWay 
messageFormatErrorHandleWay;
     @Getter private final String consumerGroup;
+    @Getter private final long pollTimeout;
 
     public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
         this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
@@ -99,6 +101,7 @@ public class KafkaSourceConfig implements Serializable {
         this.discoveryIntervalMillis = 
readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
         this.messageFormatErrorHandleWay =
                 readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+        this.pollTimeout = readonlyConfig.get(KEY_POLL_TIMEOUT);
         this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
     }
 

Reply via email to