This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 95adc7bbdd1 [ecosystem]doris-kafka-connector adds poll data timeout 
processing method (#1121)
95adc7bbdd1 is described below

commit 95adc7bbdd1253c81d7de89d9c66cac097aea84a
Author: wudongliang <[email protected]>
AuthorDate: Tue Sep 24 17:53:09 2024 +0800

    [ecosystem]doris-kafka-connector adds poll data timeout processing method 
(#1121)
    
    # Versions
    
    - [ ] dev
    - [ ] 3.0
    - [ ] 2.1
    - [ ] 2.0
    
    # Languages
    
    - [ ] Chinese
    - [ ] English
---
 common_docs_zh/ecosystem/doris-kafka-connector.md | 33 ++++++++++++++++++++++
 ecosystem/doris-kafka-connector.md                | 34 ++++++++++++++++++++++-
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/common_docs_zh/ecosystem/doris-kafka-connector.md 
b/common_docs_zh/ecosystem/doris-kafka-connector.md
index 303af0d40af..6a32ae5864f 100644
--- a/common_docs_zh/ecosystem/doris-kafka-connector.md
+++ b/common_docs_zh/ecosystem/doris-kafka-connector.md
@@ -350,3 +350,36 @@ Caused by: org.apache.kafka.connect.errors.DataException: 
JsonConverter with sch
   1. 将 `org.apache.kafka.connect.json.JsonConverter` 更换为 
`org.apache.kafka.connect.storage.StringConverter`
   2. 启动模式为 **Standalone** 模式,则将 config/connect-standalone.properties 中 
`value.converter.schemas.enable` 或 `key.converter.schemas.enable` 改成false;
     启动模式为 **Distributed** 模式,则将 config/connect-distributed.properties 中 
`value.converter.schemas.enable` 或 `key.converter.schemas.enable` 改成false
+
+**2. 消费超时,消费者被踢出消费群组:**
+
+```
+org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot 
be completed since the consumer is not part of an active group for auto 
partition assignment; it is likely that the consumer was kicked out of the 
group.
+        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
+        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
+        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
+        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
+        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
+        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
+        at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
+        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
+        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
+        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
+        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
+        at java.base/java.lang.Thread.run(Thread.java:833)
+```
+
+**解决方案:**
+
+将 Kafka 中 `max.poll.interval.ms` 根据场景进行调大,默认值是 `300000`
+- 如果是 Standalone 模式启动,则在 config/connect-standalone.properties 的配置文件中增加 
`max.poll.interval.ms` 和 `consumer.max.poll.interval.ms` 参数,并配置参数值。
+- 如果是 Distributed 模式启动,则在  config/connect-distributed.properties  的配置文件增加 
`max.poll.interval.ms` 和 `consumer.max.poll.interval.ms` 参数,并配置参数值。
+
+调整参数后,重启kafka-connect
+
diff --git a/ecosystem/doris-kafka-connector.md 
b/ecosystem/doris-kafka-connector.md
index 87422d434c3..1617857b926 100644
--- a/ecosystem/doris-kafka-connector.md
+++ b/ecosystem/doris-kafka-connector.md
@@ -349,4 +349,36 @@ This is because using the 
`org.apache.kafka.connect.json.JsonConverter` converte
 **Two solutions, choose one:**
   1. Replace `org.apache.kafka.connect.json.JsonConverter` with 
`org.apache.kafka.connect.storage.StringConverter`
   2. If the startup mode is **Standalone** mode, change 
`value.converter.schemas.enable` or `key.converter.schemas.enable` in 
config/connect-standalone.properties to false;
-   If the startup mode is **Distributed** mode, change 
`value.converter.schemas.enable` or `key.converter.schemas.enable` in 
config/connect-distributed.properties to false
\ No newline at end of file
+   If the startup mode is **Distributed** mode, change 
`value.converter.schemas.enable` or `key.converter.schemas.enable` in 
config/connect-distributed.properties to false
+
+**2. The consumption times out and the consumer is kicked out of the 
consumption group:**
+
+```
+org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot 
be completed since the consumer is not part of an active group for auto 
partition assignment; it is likely that the consumer was kicked out of the 
group.
+        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
+        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
+        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
+        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
+        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
+        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
+        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
+        at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
+        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
+        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
+        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
+        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
+        at java.base/java.lang.Thread.run(Thread.java:833)
+```
+
+**Solution:**
+
+Increase `max.poll.interval.ms` in Kafka according to the scenario. The 
default value is `300000`
+- If it is started in Standalone mode, add the `max.poll.interval.ms` and 
`consumer.max.poll.interval.ms` parameters in the configuration file of 
config/connect-standalone.properties, and configure the parameter values.
+- If it is started in Distributed mode, add the `max.poll.interval.ms` and 
`consumer.max.poll.interval.ms` parameters in the configuration file of 
config/connect-distributed.properties, and configure the parameter values.
+
+After adjusting the parameters, restart kafka-connect


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to