This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push:
new 95adc7bbdd1 [ecosystem]doris-kafka-connector adds poll data timeout
processing method (#1121)
95adc7bbdd1 is described below
commit 95adc7bbdd1253c81d7de89d9c66cac097aea84a
Author: wudongliang <[email protected]>
AuthorDate: Tue Sep 24 17:53:09 2024 +0800
[ecosystem]doris-kafka-connector adds poll data timeout processing method
(#1121)
# Versions
- [ ] dev
- [ ] 3.0
- [ ] 2.1
- [ ] 2.0
# Languages
- [ ] Chinese
- [ ] English
---
common_docs_zh/ecosystem/doris-kafka-connector.md | 33 ++++++++++++++++++++++
ecosystem/doris-kafka-connector.md | 34 ++++++++++++++++++++++-
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git a/common_docs_zh/ecosystem/doris-kafka-connector.md
b/common_docs_zh/ecosystem/doris-kafka-connector.md
index 303af0d40af..6a32ae5864f 100644
--- a/common_docs_zh/ecosystem/doris-kafka-connector.md
+++ b/common_docs_zh/ecosystem/doris-kafka-connector.md
@@ -350,3 +350,36 @@ Caused by: org.apache.kafka.connect.errors.DataException:
JsonConverter with sch
1. 将 `org.apache.kafka.connect.json.JsonConverter` 更换为
`org.apache.kafka.connect.storage.StringConverter`
2. 启动模式为 **Standalone** 模式,则将 config/connect-standalone.properties 中
`value.converter.schemas.enable` 或 `key.converter.schemas.enable` 改成false;
启动模式为 **Distributed** 模式,则将 config/connect-distributed.properties 中
`value.converter.schemas.enable` 或 `key.converter.schemas.enable` 改成false
+
+**2. 消费超时,消费者被踢出消费群组:**
+
+```
+org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot
be completed since the consumer is not part of an active group for auto
partition assignment; it is likely that the consumer was kicked out of the
group.
+ at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
+ at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
+ at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
+ at
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
+ at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
+ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
+ at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
+ at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
+ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
+ at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
+ at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
+ at java.base/java.lang.Thread.run(Thread.java:833)
+```
+
+**解决方案:**
+
+将 Kafka 中 `max.poll.interval.ms` 根据场景进行调大,默认值是 `300000`
+- 如果是 Standalone 模式启动,则在 config/connect-standalone.properties 的配置文件中增加
`max.poll.interval.ms` 和 `consumer.max.poll.interval.ms` 参数,并配置参数值。
+- 如果是 Distributed 模式启动,则在 config/connect-distributed.properties 的配置文件增加
`max.poll.interval.ms` 和 `consumer.max.poll.interval.ms` 参数,并配置参数值。
+
+调整参数后,重启kafka-connect
+
diff --git a/ecosystem/doris-kafka-connector.md
b/ecosystem/doris-kafka-connector.md
index 87422d434c3..1617857b926 100644
--- a/ecosystem/doris-kafka-connector.md
+++ b/ecosystem/doris-kafka-connector.md
@@ -349,4 +349,36 @@ This is because using the
`org.apache.kafka.connect.json.JsonConverter` converte
**Two solutions, choose one:**
1. Replace `org.apache.kafka.connect.json.JsonConverter` with
`org.apache.kafka.connect.storage.StringConverter`
2. If the startup mode is **Standalone** mode, change
`value.converter.schemas.enable` or `key.converter.schemas.enable` in
config/connect-standalone.properties to false;
- If the startup mode is **Distributed** mode, change
`value.converter.schemas.enable` or `key.converter.schemas.enable` in
config/connect-distributed.properties to false
\ No newline at end of file
+ If the startup mode is **Distributed** mode, change
`value.converter.schemas.enable` or `key.converter.schemas.enable` in
config/connect-distributed.properties to false
+
+**2. The consumption times out and the consumer is kicked out of the
consumption group:**
+
+```
+org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot
be completed since the consumer is not part of an active group for auto
partition assignment; it is likely that the consumer was kicked out of the
group.
+ at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
+ at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
+ at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
+ at
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
+ at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
+ at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
+ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
+ at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
+ at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
+ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
+ at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
+ at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
+ at java.base/java.lang.Thread.run(Thread.java:833)
+```
+
+**Solution:**
+
+Increase `max.poll.interval.ms` in Kafka according to the scenario. The
default value is `300000`
+- If it is started in Standalone mode, add the `max.poll.interval.ms` and
`consumer.max.poll.interval.ms` parameters in the configuration file of
config/connect-standalone.properties, and configure the parameter values.
+- If it is started in Distributed mode, add the `max.poll.interval.ms` and
`consumer.max.poll.interval.ms` parameters in the configuration file of
config/connect-distributed.properties, and configure the parameter values.
+
+After adjusting the parameters, restart kafka-connect
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]